diff options
author | Valery Piashchynski <[email protected]> | 2020-12-23 23:35:44 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-23 23:35:44 +0300 |
commit | e78c13d93a729a9008f283aa3db6910f3e833165 (patch) | |
tree | e633604841b80c42b195388429dc12349e4b9d1f | |
parent | c21fe8e2f99c3cb8e3c1482e35f2efa0e914c337 (diff) |
Initial implementation of serve, reset, workers commands for the RR2
-rwxr-xr-x | .rr.yaml | 7 | ||||
-rw-r--r-- | cmd/cli/reset.go | 19 | ||||
-rw-r--r-- | cmd/cli/root.go | 45 | ||||
-rw-r--r-- | cmd/cli/serve.go | 19 | ||||
-rw-r--r-- | cmd/cli/workers.go | 29 | ||||
-rw-r--r-- | cmd/main.go | 11 | ||||
-rwxr-xr-x | cmd/rr | bin | 0 -> 25579539 bytes | |||
-rwxr-xr-x | go.mod | 4 | ||||
-rwxr-xr-x | go.sum | 4 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 12 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 8 |
11 files changed, 69 insertions, 89 deletions
@@ -24,7 +24,7 @@ http: forbid: [ ".php", ".exe", ".bat" ] trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] pool: - numWorkers: 10 + numWorkers: 3 maxJobs: 0 allocateTimeout: 60s destroyTimeout: 60s @@ -105,9 +105,4 @@ reload: patterns: [ ".php", ".go",".md", ] # directories to sync. If recursive is set to true, # recursive sync will be applied only to the directories in `dirs` section - dirs: [ "." ] - rpc: - recursive: true - patterns: [ ".go" ] - # to include all project directories from workdir, leave `dirs` empty or add a dot "." dirs: [ "." ]
\ No newline at end of file diff --git a/cmd/cli/reset.go b/cmd/cli/reset.go index 82cf8590..03b470e5 100644 --- a/cmd/cli/reset.go +++ b/cmd/cli/reset.go @@ -7,10 +7,14 @@ import ( "github.com/fatih/color" "github.com/mattn/go-runewidth" "github.com/spf13/cobra" + "github.com/spiral/errors" "github.com/vbauerster/mpb/v5" "github.com/vbauerster/mpb/v5/decor" ) +const List string = "resetter.List" +const Reset string = "resetter.Reset" + func init() { root.AddCommand(&cobra.Command{ Use: "reset", @@ -20,19 +24,22 @@ func init() { } func resetHandler(cmd *cobra.Command, args []string) error { + const op = errors.Op("reset handler") client, err := RPCClient() if err != nil { return err } - defer client.Close() + defer func() { + _ = client.Close() + }() var services []string if len(args) != 0 { services = args } else { - err = client.Call("resetter.List", true, &services) + err = client.Call(List, true, &services) if err != nil { - return err + return errors.E(op, err) } } @@ -43,7 +50,7 @@ func resetHandler(cmd *cobra.Command, args []string) error { for _, service := range services { var ( bar *mpb.Bar - name = runewidth.FillRight(fmt.Sprintf("Reset [%s]", color.HiYellowString(service)), 27) + name = runewidth.FillRight(fmt.Sprintf("Resetting plugin: [%s]", color.HiYellowString(service)), 27) result = make(chan interface{}) ) @@ -61,9 +68,9 @@ func resetHandler(cmd *cobra.Command, args []string) error { defer bar.Increment() var done bool - err = client.Call("resetter.Reset", service, &done) + err = client.Call(Reset, service, &done) if err != nil { - result <- err + result <- errors.E(op, err) return } result <- nil diff --git a/cmd/cli/root.go b/cmd/cli/root.go index 00d192f6..febe410b 100644 --- a/cmd/cli/root.go +++ b/cmd/cli/root.go @@ -8,12 +8,9 @@ import ( "github.com/spiral/errors" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner-plugins/logger" rpcPlugin "github.com/spiral/roadrunner-plugins/rpc" "github.com/spiral/roadrunner-plugins/config" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" "github.com/spf13/cobra" "github.com/spiral/endure" @@ -23,7 +20,6 @@ var ( WorkDir string CfgFile string Container *endure.Endure - Logger *zap.Logger cfg *config.Viper root = &cobra.Command{ Use: "rr", @@ -43,10 +39,6 @@ func init() { root.PersistentFlags().StringVarP(&CfgFile, "config", "c", ".rr.yaml", "config file (default is .rr.yaml)") root.PersistentFlags().StringVarP(&WorkDir, "WorkDir", "w", "", "work directory") - // todo: properly handle debug level - Logger = initLogger() - // endureLogger := logger.NewZapAdapter(Logger) - cobra.OnInitialize(func() { if CfgFile != "" { if absPath, err := filepath.Abs(CfgFile); err == nil { @@ -65,24 +57,19 @@ func init() { } } - // todo: config is global, not only for serve cfg = &config.Viper{} cfg.Path = CfgFile cfg.Prefix = "rr" + // register config err := Container.Register(cfg) if err != nil { panic(err) } - - err = Container.Register(&logger.ZapLogger{}) - if err != nil { - panic(err) - } }) } -// todo: improve +// RPCClient is using to make a requests to the ./rr reset, ./rr workers func RPCClient() (*rpc.Client, error) { rpcConfig := &rpcPlugin.Config{} @@ -108,31 +95,3 @@ func RPCClient() (*rpc.Client, error) { return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil } - -func initLogger() *zap.Logger { - // todo: we do not need it - cfg := zap.Config{ - Level: zap.NewAtomicLevelAt(zap.ErrorLevel), - Encoding: "console", - EncoderConfig: zapcore.EncoderConfig{ - MessageKey: "message", - LevelKey: "level", - TimeKey: "time", - CallerKey: "caller", - NameKey: "name", - StacktraceKey: "stack", - EncodeLevel: zapcore.CapitalLevelEncoder, - EncodeTime: zapcore.ISO8601TimeEncoder, - EncodeCaller: zapcore.ShortCallerEncoder, - }, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, - } - - l, err := cfg.Build(zap.AddCaller()) - if err != nil { - panic(err) - } - - return l -} diff --git a/cmd/cli/serve.go b/cmd/cli/serve.go index ace239fc..2fe54932 100644 --- a/cmd/cli/serve.go +++ b/cmd/cli/serve.go @@ -1,20 +1,20 @@ package cli import ( + "log" "os" "os/signal" "syscall" - "github.com/spiral/errors" - "go.uber.org/zap" - "github.com/spf13/cobra" + "github.com/spiral/errors" + "go.uber.org/multierr" ) func init() { root.AddCommand(&cobra.Command{ Use: "serve", - Short: "Start RoadRunner Temporal service(s)", + Short: "Start RoadRunner server", RunE: handler, }) } @@ -25,6 +25,7 @@ func handler(cmd *cobra.Command, args []string) error { We need to have path to the config at the RegisterTarget stage But after cobra.Execute, because cobra fills up cli variables on this stage */ + err := Container.Init() if err != nil { return errors.E(op, err) @@ -43,14 +44,14 @@ func handler(cmd *cobra.Command, args []string) error { for { select { case e := <-errCh: - Logger.Error(e.Error.Error(), zap.String("service", e.VertexID)) + err = multierr.Append(err, e.Error) + log.Printf("error occurred: %v, service: %s", e.Error.Error(), e.VertexID) er := Container.Stop() if er != nil { - Logger.Error(e.Error.Error(), zap.String("service", e.VertexID)) - if er != nil { - return errors.E(op, er) - } + err = multierr.Append(err, er) + return errors.E(op, err) } + return errors.E(op, err) case <-c: err = Container.Stop() if err != nil { diff --git a/cmd/cli/workers.go b/cmd/cli/workers.go index e9c8ab2c..4bcbbdbd 100644 --- a/cmd/cli/workers.go +++ b/cmd/cli/workers.go @@ -2,27 +2,25 @@ package cli import ( "fmt" + "log" "net/rpc" "os" - "os/signal" - "syscall" "time" tm "github.com/buger/goterm" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/tools" - "go.uber.org/zap" - "github.com/fatih/color" "github.com/spf13/cobra" + "github.com/spiral/errors" "github.com/spiral/roadrunner-plugins/informer" + "github.com/spiral/roadrunner/v2/tools" ) var ( interactive bool - stopSignal = make(chan os.Signal, 1) ) +const InformerList string = "informer.List" + func init() { workersCommand := &cobra.Command{ Use: "workers", @@ -39,13 +37,11 @@ func init() { ) root.AddCommand(workersCommand) - - signal.Notify(stopSignal, syscall.SIGTERM) - signal.Notify(stopSignal, syscall.SIGINT) } func workersHandler(cmd *cobra.Command, args []string) error { const op = errors.Op("workers handler") + // get RPC client client, err := RPCClient() if err != nil { return err @@ -53,17 +49,18 @@ func workersHandler(cmd *cobra.Command, args []string) error { defer func() { err := client.Close() if err != nil { - Logger.Error("error when closing RPCClient", zap.Error(err)) + log.Printf("error when closing RPCClient: error %v", err) } }() var plugins []string + // assume user wants to show workers from particular plugin if len(args) != 0 { plugins = args } else { - err = client.Call("informer.List", true, &plugins) + err = client.Call(InformerList, true, &plugins) if err != nil { - return err + return errors.E(op, err) } } @@ -72,11 +69,11 @@ func workersHandler(cmd *cobra.Command, args []string) error { } tm.Clear() + tt := time.NewTicker(time.Second) + defer tt.Stop() for { select { - case <-stopSignal: - return nil - case <-time.NewTicker(time.Second).C: + case <-tt.C: tm.MoveCursor(1, 1) err := showWorkers(plugins, client) if err != nil { diff --git a/cmd/main.go b/cmd/main.go index ed304252..ac98c5a8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -6,6 +6,7 @@ import ( "github.com/spiral/endure" "github.com/spiral/roadrunner-plugins/http" "github.com/spiral/roadrunner-plugins/informer" + "github.com/spiral/roadrunner-plugins/logger" "github.com/spiral/roadrunner-plugins/metrics" "github.com/spiral/roadrunner-plugins/redis" "github.com/spiral/roadrunner-plugins/reload" @@ -23,13 +24,23 @@ func main() { } err = cli.Container.RegisterAll( + // logger plugin + &logger.ZapLogger{}, + // metrics plugin &metrics.Plugin{}, + // redis plugin (internal) &redis.Plugin{}, + // http server plugin &http.Plugin{}, + // reload plugin &reload.Plugin{}, + // informer plugin (./rr workers) &informer.Plugin{}, + // resetter plugin (./rr reset) &resetter.Plugin{}, + // rpc plugin (workers, reset) &rpc.Plugin{}, + // server plugin (NewWorker, NewWorkerPool) &server.Plugin{}, ) if err != nil { Binary files differ@@ -32,7 +32,7 @@ require ( golang.org/x/sync v0.0.0-20201207232520-09787c993a3a ) -//replace github.com/spiral/roadrunner-plugins/http v1.0.0 => ../roadrunner-plugins/http +replace github.com/spiral/roadrunner-plugins/http v1.0.2 => ../roadrunner-plugins/http //replace github.com/spiral/roadrunner-plugins/reload v1.0.0 => ../roadrunner-plugins/reload //replace github.com/spiral/roadrunner-plugins/rpc v1.0.0 => ../roadrunner-plugins/rpc -//replace github.com/spiral/roadrunner-plugins/server v1.0.1 => ../roadrunner-plugins/server +replace github.com/spiral/roadrunner-plugins/server v1.0.3 => ../roadrunner-plugins/server @@ -421,8 +421,8 @@ github.com/spiral/roadrunner-plugins/config v1.0.0 h1:RPYB8Ha/UeuBGRwtcqNb0uU8R5 github.com/spiral/roadrunner-plugins/config v1.0.0/go.mod h1:XcVJLFDUlYPvZ3kVzssmP4fJbEzUvVJf534+eZaotAo= github.com/spiral/roadrunner-plugins/http v1.0.0 h1:8WGAuZOrkYZQWo6n13ip+ZtzhKugZZ+b5W+ivVr7FjI= github.com/spiral/roadrunner-plugins/http v1.0.0/go.mod h1:37ReUuAKJDtXH3GjMjRH5q3plBXq5r5lUfltRTVZzDE= -github.com/spiral/roadrunner-plugins/http v1.0.1 h1:5pK7390tTcW+LZXCiyH7KowZvcj4Gum/hfAGl8BALDY= -github.com/spiral/roadrunner-plugins/http v1.0.1/go.mod h1:JRgVSgJRh6wDmVs1rFJHQ9PM0xtCpzX9vBtVDItXZ/E= +github.com/spiral/roadrunner-plugins/http v1.0.2 h1:BRmdbx0DYlgYTkvmd1IDZJewC1y30GqBCKRjbTn3OLc= +github.com/spiral/roadrunner-plugins/http v1.0.2/go.mod h1:JRgVSgJRh6wDmVs1rFJHQ9PM0xtCpzX9vBtVDItXZ/E= github.com/spiral/roadrunner-plugins/informer v1.0.3 h1:6DXs8IRDjBvAr2mGKXVlhx1KLLfVqpXgvP2va79kR1s= github.com/spiral/roadrunner-plugins/informer v1.0.3/go.mod h1:OPEJNADBbNQyx0/KuXQbY3Mqemo30vZh6duf6YpRf7M= github.com/spiral/roadrunner-plugins/informer v1.0.4 h1:J2DXrQkRRyYDGw/6TIo7qF74Zn3AOHQe+9g0Hj5BwR4= diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6e9141c9..db182a3e 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -191,6 +191,10 @@ func (w *Process) Wait() error { const op = errors.Op("worker process wait") err := multierr.Combine(w.cmd.Wait()) + if w.State().Value() == internal.StateDestroyed { + return errors.E(op, err) + } + // at this point according to the documentation (see cmd.Wait comment) // if worker finishes with an error, message will be written to the stderr first // and then w.cmd.Wait return an error @@ -249,6 +253,14 @@ func (w *Process) Stop() error { // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! func (w *Process) Kill() error { + if w.State().Value() == internal.StateDestroyed { + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + return nil + } + w.state.Set(internal.StateKilling) err := w.cmd.Process.Signal(os.Kill) if err != nil { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 55191ce6..39d334ba 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -128,6 +128,8 @@ func (stack *Stack) Destroy(ctx context.Context) { for i := 0; i < len(stack.workers); i++ { // set state for the stack in the stack (unused at the moment) stack.workers[i].State().Set(internal.StateDestroyed) + // kill the worker + _ = stack.workers[i].Kill() } stack.mutex.Unlock() tt.Stop() @@ -223,11 +225,6 @@ func (ww *workerWatcher) AllocateNew() error { ww.stack.mutex.Unlock() ww.PushWorker(sw) - ww.events.Push(events.PoolEvent{ - Event: events.EventWorkerConstruct, - Payload: sw, - }) - return nil } @@ -282,6 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == internal.StateDestroyed { // worker was manually destroyed, no need to replace + ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } |