summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.rr.yaml7
-rw-r--r--cmd/cli/reset.go19
-rw-r--r--cmd/cli/root.go45
-rw-r--r--cmd/cli/serve.go19
-rw-r--r--cmd/cli/workers.go29
-rw-r--r--cmd/main.go11
-rwxr-xr-xcmd/rrbin0 -> 25579539 bytes
-rwxr-xr-xgo.mod4
-rwxr-xr-xgo.sum4
-rwxr-xr-xpkg/worker/worker.go12
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go8
11 files changed, 69 insertions, 89 deletions
diff --git a/.rr.yaml b/.rr.yaml
index 93672ebe..3154f5b4 100755
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -24,7 +24,7 @@ http:
forbid: [ ".php", ".exe", ".bat" ]
trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
pool:
- numWorkers: 10
+ numWorkers: 3
maxJobs: 0
allocateTimeout: 60s
destroyTimeout: 60s
@@ -105,9 +105,4 @@ reload:
patterns: [ ".php", ".go",".md", ]
# directories to sync. If recursive is set to true,
# recursive sync will be applied only to the directories in `dirs` section
- dirs: [ "." ]
- rpc:
- recursive: true
- patterns: [ ".go" ]
- # to include all project directories from workdir, leave `dirs` empty or add a dot "."
dirs: [ "." ] \ No newline at end of file
diff --git a/cmd/cli/reset.go b/cmd/cli/reset.go
index 82cf8590..03b470e5 100644
--- a/cmd/cli/reset.go
+++ b/cmd/cli/reset.go
@@ -7,10 +7,14 @@ import (
"github.com/fatih/color"
"github.com/mattn/go-runewidth"
"github.com/spf13/cobra"
+ "github.com/spiral/errors"
"github.com/vbauerster/mpb/v5"
"github.com/vbauerster/mpb/v5/decor"
)
+const List string = "resetter.List"
+const Reset string = "resetter.Reset"
+
func init() {
root.AddCommand(&cobra.Command{
Use: "reset",
@@ -20,19 +24,22 @@ func init() {
}
func resetHandler(cmd *cobra.Command, args []string) error {
+ const op = errors.Op("reset handler")
client, err := RPCClient()
if err != nil {
return err
}
- defer client.Close()
+ defer func() {
+ _ = client.Close()
+ }()
var services []string
if len(args) != 0 {
services = args
} else {
- err = client.Call("resetter.List", true, &services)
+ err = client.Call(List, true, &services)
if err != nil {
- return err
+ return errors.E(op, err)
}
}
@@ -43,7 +50,7 @@ func resetHandler(cmd *cobra.Command, args []string) error {
for _, service := range services {
var (
bar *mpb.Bar
- name = runewidth.FillRight(fmt.Sprintf("Reset [%s]", color.HiYellowString(service)), 27)
+ name = runewidth.FillRight(fmt.Sprintf("Resetting plugin: [%s]", color.HiYellowString(service)), 27)
result = make(chan interface{})
)
@@ -61,9 +68,9 @@ func resetHandler(cmd *cobra.Command, args []string) error {
defer bar.Increment()
var done bool
- err = client.Call("resetter.Reset", service, &done)
+ err = client.Call(Reset, service, &done)
if err != nil {
- result <- err
+ result <- errors.E(op, err)
return
}
result <- nil
diff --git a/cmd/cli/root.go b/cmd/cli/root.go
index 00d192f6..febe410b 100644
--- a/cmd/cli/root.go
+++ b/cmd/cli/root.go
@@ -8,12 +8,9 @@ import (
"github.com/spiral/errors"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner-plugins/logger"
rpcPlugin "github.com/spiral/roadrunner-plugins/rpc"
"github.com/spiral/roadrunner-plugins/config"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
"github.com/spf13/cobra"
"github.com/spiral/endure"
@@ -23,7 +20,6 @@ var (
WorkDir string
CfgFile string
Container *endure.Endure
- Logger *zap.Logger
cfg *config.Viper
root = &cobra.Command{
Use: "rr",
@@ -43,10 +39,6 @@ func init() {
root.PersistentFlags().StringVarP(&CfgFile, "config", "c", ".rr.yaml", "config file (default is .rr.yaml)")
root.PersistentFlags().StringVarP(&WorkDir, "WorkDir", "w", "", "work directory")
- // todo: properly handle debug level
- Logger = initLogger()
- // endureLogger := logger.NewZapAdapter(Logger)
-
cobra.OnInitialize(func() {
if CfgFile != "" {
if absPath, err := filepath.Abs(CfgFile); err == nil {
@@ -65,24 +57,19 @@ func init() {
}
}
- // todo: config is global, not only for serve
cfg = &config.Viper{}
cfg.Path = CfgFile
cfg.Prefix = "rr"
+ // register config
err := Container.Register(cfg)
if err != nil {
panic(err)
}
-
- err = Container.Register(&logger.ZapLogger{})
- if err != nil {
- panic(err)
- }
})
}
-// todo: improve
+// RPCClient is using to make a requests to the ./rr reset, ./rr workers
func RPCClient() (*rpc.Client, error) {
rpcConfig := &rpcPlugin.Config{}
@@ -108,31 +95,3 @@ func RPCClient() (*rpc.Client, error) {
return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
}
-
-func initLogger() *zap.Logger {
- // todo: we do not need it
- cfg := zap.Config{
- Level: zap.NewAtomicLevelAt(zap.ErrorLevel),
- Encoding: "console",
- EncoderConfig: zapcore.EncoderConfig{
- MessageKey: "message",
- LevelKey: "level",
- TimeKey: "time",
- CallerKey: "caller",
- NameKey: "name",
- StacktraceKey: "stack",
- EncodeLevel: zapcore.CapitalLevelEncoder,
- EncodeTime: zapcore.ISO8601TimeEncoder,
- EncodeCaller: zapcore.ShortCallerEncoder,
- },
- OutputPaths: []string{"stderr"},
- ErrorOutputPaths: []string{"stderr"},
- }
-
- l, err := cfg.Build(zap.AddCaller())
- if err != nil {
- panic(err)
- }
-
- return l
-}
diff --git a/cmd/cli/serve.go b/cmd/cli/serve.go
index ace239fc..2fe54932 100644
--- a/cmd/cli/serve.go
+++ b/cmd/cli/serve.go
@@ -1,20 +1,20 @@
package cli
import (
+ "log"
"os"
"os/signal"
"syscall"
- "github.com/spiral/errors"
- "go.uber.org/zap"
-
"github.com/spf13/cobra"
+ "github.com/spiral/errors"
+ "go.uber.org/multierr"
)
func init() {
root.AddCommand(&cobra.Command{
Use: "serve",
- Short: "Start RoadRunner Temporal service(s)",
+ Short: "Start RoadRunner server",
RunE: handler,
})
}
@@ -25,6 +25,7 @@ func handler(cmd *cobra.Command, args []string) error {
We need to have path to the config at the RegisterTarget stage
But after cobra.Execute, because cobra fills up cli variables on this stage
*/
+
err := Container.Init()
if err != nil {
return errors.E(op, err)
@@ -43,14 +44,14 @@ func handler(cmd *cobra.Command, args []string) error {
for {
select {
case e := <-errCh:
- Logger.Error(e.Error.Error(), zap.String("service", e.VertexID))
+ err = multierr.Append(err, e.Error)
+ log.Printf("error occurred: %v, service: %s", e.Error.Error(), e.VertexID)
er := Container.Stop()
if er != nil {
- Logger.Error(e.Error.Error(), zap.String("service", e.VertexID))
- if er != nil {
- return errors.E(op, er)
- }
+ err = multierr.Append(err, er)
+ return errors.E(op, err)
}
+ return errors.E(op, err)
case <-c:
err = Container.Stop()
if err != nil {
diff --git a/cmd/cli/workers.go b/cmd/cli/workers.go
index e9c8ab2c..4bcbbdbd 100644
--- a/cmd/cli/workers.go
+++ b/cmd/cli/workers.go
@@ -2,27 +2,25 @@ package cli
import (
"fmt"
+ "log"
"net/rpc"
"os"
- "os/signal"
- "syscall"
"time"
tm "github.com/buger/goterm"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/tools"
- "go.uber.org/zap"
-
"github.com/fatih/color"
"github.com/spf13/cobra"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner-plugins/informer"
+ "github.com/spiral/roadrunner/v2/tools"
)
var (
interactive bool
- stopSignal = make(chan os.Signal, 1)
)
+const InformerList string = "informer.List"
+
func init() {
workersCommand := &cobra.Command{
Use: "workers",
@@ -39,13 +37,11 @@ func init() {
)
root.AddCommand(workersCommand)
-
- signal.Notify(stopSignal, syscall.SIGTERM)
- signal.Notify(stopSignal, syscall.SIGINT)
}
func workersHandler(cmd *cobra.Command, args []string) error {
const op = errors.Op("workers handler")
+ // get RPC client
client, err := RPCClient()
if err != nil {
return err
@@ -53,17 +49,18 @@ func workersHandler(cmd *cobra.Command, args []string) error {
defer func() {
err := client.Close()
if err != nil {
- Logger.Error("error when closing RPCClient", zap.Error(err))
+ log.Printf("error when closing RPCClient: error %v", err)
}
}()
var plugins []string
+ // assume user wants to show workers from particular plugin
if len(args) != 0 {
plugins = args
} else {
- err = client.Call("informer.List", true, &plugins)
+ err = client.Call(InformerList, true, &plugins)
if err != nil {
- return err
+ return errors.E(op, err)
}
}
@@ -72,11 +69,11 @@ func workersHandler(cmd *cobra.Command, args []string) error {
}
tm.Clear()
+ tt := time.NewTicker(time.Second)
+ defer tt.Stop()
for {
select {
- case <-stopSignal:
- return nil
- case <-time.NewTicker(time.Second).C:
+ case <-tt.C:
tm.MoveCursor(1, 1)
err := showWorkers(plugins, client)
if err != nil {
diff --git a/cmd/main.go b/cmd/main.go
index ed304252..ac98c5a8 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -6,6 +6,7 @@ import (
"github.com/spiral/endure"
"github.com/spiral/roadrunner-plugins/http"
"github.com/spiral/roadrunner-plugins/informer"
+ "github.com/spiral/roadrunner-plugins/logger"
"github.com/spiral/roadrunner-plugins/metrics"
"github.com/spiral/roadrunner-plugins/redis"
"github.com/spiral/roadrunner-plugins/reload"
@@ -23,13 +24,23 @@ func main() {
}
err = cli.Container.RegisterAll(
+ // logger plugin
+ &logger.ZapLogger{},
+ // metrics plugin
&metrics.Plugin{},
+ // redis plugin (internal)
&redis.Plugin{},
+ // http server plugin
&http.Plugin{},
+ // reload plugin
&reload.Plugin{},
+ // informer plugin (./rr workers)
&informer.Plugin{},
+ // resetter plugin (./rr reset)
&resetter.Plugin{},
+ // rpc plugin (workers, reset)
&rpc.Plugin{},
+ // server plugin (NewWorker, NewWorkerPool)
&server.Plugin{},
)
if err != nil {
diff --git a/cmd/rr b/cmd/rr
new file mode 100755
index 00000000..b6fd15f4
--- /dev/null
+++ b/cmd/rr
Binary files differ
diff --git a/go.mod b/go.mod
index 97e99423..be16ada8 100755
--- a/go.mod
+++ b/go.mod
@@ -32,7 +32,7 @@ require (
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
)
-//replace github.com/spiral/roadrunner-plugins/http v1.0.0 => ../roadrunner-plugins/http
+replace github.com/spiral/roadrunner-plugins/http v1.0.2 => ../roadrunner-plugins/http
//replace github.com/spiral/roadrunner-plugins/reload v1.0.0 => ../roadrunner-plugins/reload
//replace github.com/spiral/roadrunner-plugins/rpc v1.0.0 => ../roadrunner-plugins/rpc
-//replace github.com/spiral/roadrunner-plugins/server v1.0.1 => ../roadrunner-plugins/server
+replace github.com/spiral/roadrunner-plugins/server v1.0.3 => ../roadrunner-plugins/server
diff --git a/go.sum b/go.sum
index 491b8c25..0563a7d2 100755
--- a/go.sum
+++ b/go.sum
@@ -421,8 +421,8 @@ github.com/spiral/roadrunner-plugins/config v1.0.0 h1:RPYB8Ha/UeuBGRwtcqNb0uU8R5
github.com/spiral/roadrunner-plugins/config v1.0.0/go.mod h1:XcVJLFDUlYPvZ3kVzssmP4fJbEzUvVJf534+eZaotAo=
github.com/spiral/roadrunner-plugins/http v1.0.0 h1:8WGAuZOrkYZQWo6n13ip+ZtzhKugZZ+b5W+ivVr7FjI=
github.com/spiral/roadrunner-plugins/http v1.0.0/go.mod h1:37ReUuAKJDtXH3GjMjRH5q3plBXq5r5lUfltRTVZzDE=
-github.com/spiral/roadrunner-plugins/http v1.0.1 h1:5pK7390tTcW+LZXCiyH7KowZvcj4Gum/hfAGl8BALDY=
-github.com/spiral/roadrunner-plugins/http v1.0.1/go.mod h1:JRgVSgJRh6wDmVs1rFJHQ9PM0xtCpzX9vBtVDItXZ/E=
+github.com/spiral/roadrunner-plugins/http v1.0.2 h1:BRmdbx0DYlgYTkvmd1IDZJewC1y30GqBCKRjbTn3OLc=
+github.com/spiral/roadrunner-plugins/http v1.0.2/go.mod h1:JRgVSgJRh6wDmVs1rFJHQ9PM0xtCpzX9vBtVDItXZ/E=
github.com/spiral/roadrunner-plugins/informer v1.0.3 h1:6DXs8IRDjBvAr2mGKXVlhx1KLLfVqpXgvP2va79kR1s=
github.com/spiral/roadrunner-plugins/informer v1.0.3/go.mod h1:OPEJNADBbNQyx0/KuXQbY3Mqemo30vZh6duf6YpRf7M=
github.com/spiral/roadrunner-plugins/informer v1.0.4 h1:J2DXrQkRRyYDGw/6TIo7qF74Zn3AOHQe+9g0Hj5BwR4=
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 6e9141c9..db182a3e 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -191,6 +191,10 @@ func (w *Process) Wait() error {
const op = errors.Op("worker process wait")
err := multierr.Combine(w.cmd.Wait())
+ if w.State().Value() == internal.StateDestroyed {
+ return errors.E(op, err)
+ }
+
// at this point according to the documentation (see cmd.Wait comment)
// if worker finishes with an error, message will be written to the stderr first
// and then w.cmd.Wait return an error
@@ -249,6 +253,14 @@ func (w *Process) Stop() error {
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
func (w *Process) Kill() error {
+ if w.State().Value() == internal.StateDestroyed {
+ err := w.cmd.Process.Signal(os.Kill)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+
w.state.Set(internal.StateKilling)
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 55191ce6..39d334ba 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -128,6 +128,8 @@ func (stack *Stack) Destroy(ctx context.Context) {
for i := 0; i < len(stack.workers); i++ {
// set state for the stack in the stack (unused at the moment)
stack.workers[i].State().Set(internal.StateDestroyed)
+ // kill the worker
+ _ = stack.workers[i].Kill()
}
stack.mutex.Unlock()
tt.Stop()
@@ -223,11 +225,6 @@ func (ww *workerWatcher) AllocateNew() error {
ww.stack.mutex.Unlock()
ww.PushWorker(sw)
- ww.events.Push(events.PoolEvent{
- Event: events.EventWorkerConstruct,
- Payload: sw,
- })
-
return nil
}
@@ -282,6 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == internal.StateDestroyed {
// worker was manually destroyed, no need to replace
+ ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}