summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/rr/http/reset.go (renamed from cmd/rr/http/reload.go)35
-rw-r--r--cmd/rr/http/workers.go70
-rw-r--r--http/config.go2
-rw-r--r--http/rpc.go130
-rw-r--r--http/service.go23
-rw-r--r--state.go2
6 files changed, 141 insertions, 121 deletions
diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reset.go
index 06149065..f5c100cc 100644
--- a/cmd/rr/http/reload.go
+++ b/cmd/rr/http/reset.go
@@ -24,34 +24,35 @@ import (
"errors"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/rpc"
+ "github.com/spiral/roadrunner/service"
)
func init() {
rr.CLI.AddCommand(&cobra.Command{
- Use: "http:reload",
+ Use: "http:reset",
Short: "Reload RoadRunner worker pools for the HTTP service",
RunE: reloadHandler,
})
}
func reloadHandler(cmd *cobra.Command, args []string) error {
- if !rr.Container.Has("rpc") {
+ svc, st := rr.Container.Get(rpc.Name)
+ if st < service.StatusConfigured {
return errors.New("RPC service is not configured")
}
- return nil
- //todo: change
- //client, err := rr.Container.Get("rpc").(*rpc.Service).Client()
- //if err != nil {
- // return err
- //}
- //defer client.Close()
- //
- //var r string
- //if err := client.Call("http.Reset", true, &r); err != nil {
- // return err
- //}
- //
- //rr.Logger.Info("http.service: restarting worker pool")
- //return nil
+ client, err := svc.(*rpc.Service).Client()
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ var r string
+ if err := client.Call("http.Reset", true, &r); err != nil {
+ return err
+ }
+
+ rr.Logger.Info("http: restarting worker pool")
+ return nil
}
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
index c9aa5b04..62730e25 100644
--- a/cmd/rr/http/workers.go
+++ b/cmd/rr/http/workers.go
@@ -24,6 +24,14 @@ import (
"errors"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/rpc"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/http"
+ "github.com/olekukonko/tablewriter"
+ "os"
+ "strconv"
+ "time"
+ "github.com/fatih/color"
)
func init() {
@@ -35,33 +43,47 @@ func init() {
}
func workersHandler(cmd *cobra.Command, args []string) error {
- if !rr.Container.Has("rpc") {
+ svc, st := rr.Container.Get(rpc.Name)
+ if st < service.StatusConfigured {
return errors.New("RPC service is not configured")
}
- //todo: change
- //client, err := rr.Container.Get("rpc").(*rpc.Service).Client()
- //if err != nil {
- // return err
- //}
- //defer client.Close()
+ client, err := svc.(*rpc.Service).Client()
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ var r http.WorkerList
+ if err := client.Call("http.Workers", true, &r); err != nil {
+ panic(err)
+ }
+
+ tw := tablewriter.NewWriter(os.Stdout)
+ tw.SetHeader([]string{"PID", "Status", "Handled Jobs", "Alive"})
+
+ for _, w := range r.Workers {
+ tw.Append([]string{
+ color.YellowString(strconv.Itoa(w.Pid)),
+ renderStatus(w.Status),
+ renderJobs(w.NumJobs),
+ renderAlive(time.Unix(0, w.Created)),
+ })
+ }
+
+ tw.Render()
- //var r http.WorkerList
- //if err := client.Call("http.Workers", true, &r); err != nil {
- // panic(err)
- //}
- //
- //tw := tablewriter.NewWriter(os.Stdout)
- //tw.SetHeader([]string{"PID", "Status", "Num Execs"})
- //
- //for _, w := range r.Workers {
- // tw.Append([]string{
- // strconv.Itoa(w.Pid),
- // w.Status,
- // strconv.Itoa(int(w.NumExecs)),
- // })
- //}
- //
- //tw.Render()
return nil
}
+
+func renderStatus(status string) string {
+ return status
+}
+
+func renderJobs(number uint64) string {
+ return strconv.Itoa(int(number))
+}
+
+func renderAlive(t time.Time) string {
+ return time.Now().Sub(t).String()
+}
diff --git a/http/config.go b/http/config.go
index abb14d9a..d92b4c60 100644
--- a/http/config.go
+++ b/http/config.go
@@ -6,7 +6,7 @@ import (
// Configures RoadRunner HTTP server.
type Config struct {
- // Enable enables http service.
+ // Enable enables http svc.
Enable bool
// Address and port to handle as http server.
diff --git a/http/rpc.go b/http/rpc.go
index d9f911f4..2e33aad7 100644
--- a/http/rpc.go
+++ b/http/rpc.go
@@ -1,73 +1,61 @@
package http
-//
-//type rpcServer struct {
-// service *Service
-//}
-//
-//// WorkerList contains list of workers.
-//type WorkerList struct {
-// // Workers is list of workers.
-// Workers []utils.Worker `json:"workers"`
-//}
-//
-//// Reset resets underlying RR worker pool and restarts all of it's workers.
-//func (rpc *rpcServer) Reset(reset bool, r *string) error {
-// if rpc.service.srv == nil {
-// return errors.New("no http server")
-// }
-//
-// logrus.Info("http: restarting worker pool")
-// *r = "OK"
-//
-// err := rpc.service.srv.rr.Reset()
-// if err != nil {
-// logrus.Errorf("http: %s", err)
-// }
-//
-// return err
-//}
-//
-//// Workers returns list of active workers and their stats.
-//func (rpc *rpcServer) Workers(list bool, r *WorkerList) error {
-// if rpc.service.srv == nil {
-// return errors.New("no http server")
-// }
-//
-// r.Workers = utils.FetchWorkers(rpc.service.srv.rr)
-// return nil
-//}
-//
-//// Worker provides information about specific worker.
-//type Worker struct {
-// // Pid contains process id.
-// Pid int `json:"pid"`
-//
-// // Status of the worker.
-// Status string `json:"status"`
-//
-// // Number of worker executions.
-// NumExecs uint64 `json:"numExecs"`
-//
-// // Created is unix nano timestamp of worker creation time.
-// Created int64 `json:"created"`
-//
-// // Updated is unix nano timestamp of last worker execution.
-// Updated int64 `json:"updated"`
-//}
-//
-//// FetchWorkers fetches list of workers from RR Server.
-//func FetchWorkers(srv *roadrunner.Server) (result []Worker) {
-// for _, w := range srv.Workers() {
-// state := w.State()
-// result = append(result, Worker{
-// Pid: *w.Pid,
-// Status: state.String(),
-// NumExecs: state.NumExecs(),
-// Created: w.Created.UnixNano(),
-// Updated: state.Updated().UnixNano(),
-// })
-// }
-//
-// return
-//}
+import (
+ "github.com/pkg/errors"
+)
+
+type rpcServer struct{ svc *Service }
+
+// WorkerList contains list of workers.
+type WorkerList struct {
+ // Workers is list of workers.
+ Workers []Worker `json:"workers"`
+}
+
+// Worker provides information about specific worker.
+type Worker struct {
+ // Pid contains process id.
+ Pid int `json:"pid"`
+
+ // Status of the worker.
+ Status string `json:"status"`
+
+ // Number of worker executions.
+ NumJobs uint64 `json:"numExecs"`
+
+ // Created is unix nano timestamp of worker creation time.
+ Created int64 `json:"created"`
+
+ // Updated is unix nano timestamp of last worker execution.
+ Updated int64 `json:"updated"`
+}
+
+// Reset resets underlying RR worker pool and restarts all of it's workers.
+func (rpc *rpcServer) Reset(reset bool, r *string) error {
+ if rpc.svc.srv == nil {
+ return errors.New("http server is not running")
+ }
+
+ *r = "OK"
+ return rpc.svc.srv.rr.Reset()
+}
+
+// Workers returns list of active workers and their stats.
+func (rpc *rpcServer) Workers(list bool, r *WorkerList) error {
+ if rpc.svc.srv == nil {
+ return errors.New("http server is not running")
+ }
+
+ for _, w := range rpc.svc.rr.Workers() {
+ state := w.State()
+ r.Workers = append(r.Workers, Worker{
+ Pid: *w.Pid,
+ Status: state.String(),
+ NumJobs: state.NumExecs(),
+ Created: w.Created.UnixNano(),
+ Updated: state.Updated().UnixNano(),
+ })
+ }
+
+ return nil
+}
diff --git a/http/service.go b/http/service.go
index dc996147..81ffb8a2 100644
--- a/http/service.go
+++ b/http/service.go
@@ -5,9 +5,10 @@ import (
"github.com/spiral/roadrunner/service"
"context"
"github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/rpc"
)
-// Name contains default service name.
+// Name contains default svc name.
const Name = "http"
type Middleware interface {
@@ -17,8 +18,11 @@ type Middleware interface {
// Service manages rr, http servers.
type Service struct {
- cfg *Config
- listener func(event int, ctx interface{})
+ cfg *Config
+
+ // todo: multiple listeners
+ listener func(event int, ctx interface{})
+
middleware []Middleware
rr *roadrunner.Server
srv *Server
@@ -34,7 +38,7 @@ func (s *Service) Listen(o func(event int, ctx interface{})) {
s.listener = o
}
-// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
+// Configure must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
func (s *Service) Configure(cfg service.Config, c service.Container) (bool, error) {
config := &Config{}
@@ -52,12 +56,17 @@ func (s *Service) Configure(cfg service.Config, c service.Container) (bool, erro
s.cfg = config
- // todo: RPC
+ // registering http RPC interface
+ if r, ok := c.Get(rpc.Name); ok >= service.StatusConfigured {
+ if h, ok := r.(*rpc.Service); ok {
+ h.Register(Name, &rpcServer{s})
+ }
+ }
return true, nil
}
-// Serve serves the service.
+// Serve serves the svc.
func (s *Service) Serve() error {
rr := roadrunner.NewServer(s.cfg.Workers)
@@ -86,7 +95,7 @@ func (s *Service) Serve() error {
return nil
}
-// Stop stops the service.
+// Stop stops the svc.
func (s *Service) Stop() {
if s.http == nil {
return
diff --git a/state.go b/state.go
index 7bd5437d..da4d2b6f 100644
--- a/state.go
+++ b/state.go
@@ -14,7 +14,7 @@ type State interface {
// Value returns state value
Value() int64
- // NumExecs shows how many times worker was invoked
+ // NumJobs shows how many times worker was invoked
NumExecs() uint64
// Updated indicates a moment updated last state change