From d40ff179e43a02726bfa4298e523a16c79a88cea Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 16 Nov 2020 15:11:27 +0300 Subject: Rename app->server Rename Config -> PoolConfig --- plugins/app/config.go | 37 --- plugins/app/plugin.go | 180 ----------- plugins/app/tests/app_test.go | 358 --------------------- plugins/app/tests/configs/.rr-no-app-section.yaml | 9 - plugins/app/tests/configs/.rr-sockets.yaml | 9 - plugins/app/tests/configs/.rr-tcp.yaml | 9 - plugins/app/tests/configs/.rr-wrong-command.yaml | 9 - plugins/app/tests/configs/.rr-wrong-relay.yaml | 9 - plugins/app/tests/configs/.rr.yaml | 9 - plugins/app/tests/plugin_pipes.go | 130 -------- plugins/app/tests/plugin_sockets.go | 111 ------- plugins/app/tests/plugin_tcp.go | 111 ------- plugins/app/tests/socket.php | 25 -- plugins/app/tests/tcp.php | 20 -- plugins/server/config.go | 41 +++ plugins/server/plugin.go | 172 ++++++++++ .../server/tests/configs/.rr-no-app-section.yaml | 9 + plugins/server/tests/configs/.rr-sockets.yaml | 9 + plugins/server/tests/configs/.rr-tcp.yaml | 9 + .../server/tests/configs/.rr-wrong-command.yaml | 9 + plugins/server/tests/configs/.rr-wrong-relay.yaml | 9 + plugins/server/tests/configs/.rr.yaml | 9 + plugins/server/tests/plugin_pipes.go | 131 ++++++++ plugins/server/tests/plugin_sockets.go | 112 +++++++ plugins/server/tests/plugin_tcp.go | 112 +++++++ plugins/server/tests/server_test.go | 358 +++++++++++++++++++++ plugins/server/tests/socket.php | 25 ++ plugins/server/tests/tcp.php | 20 ++ 28 files changed, 1025 insertions(+), 1026 deletions(-) delete mode 100644 plugins/app/config.go delete mode 100644 plugins/app/plugin.go delete mode 100644 plugins/app/tests/app_test.go delete mode 100644 plugins/app/tests/configs/.rr-no-app-section.yaml delete mode 100644 plugins/app/tests/configs/.rr-sockets.yaml delete mode 100644 plugins/app/tests/configs/.rr-tcp.yaml delete mode 100644 plugins/app/tests/configs/.rr-wrong-command.yaml delete mode 100644 plugins/app/tests/configs/.rr-wrong-relay.yaml delete mode 100644 plugins/app/tests/configs/.rr.yaml delete mode 100644 plugins/app/tests/plugin_pipes.go delete mode 100644 plugins/app/tests/plugin_sockets.go delete mode 100644 plugins/app/tests/plugin_tcp.go delete mode 100644 plugins/app/tests/socket.php delete mode 100644 plugins/app/tests/tcp.php create mode 100644 plugins/server/config.go create mode 100644 plugins/server/plugin.go create mode 100644 plugins/server/tests/configs/.rr-no-app-section.yaml create mode 100644 plugins/server/tests/configs/.rr-sockets.yaml create mode 100644 plugins/server/tests/configs/.rr-tcp.yaml create mode 100644 plugins/server/tests/configs/.rr-wrong-command.yaml create mode 100644 plugins/server/tests/configs/.rr-wrong-relay.yaml create mode 100644 plugins/server/tests/configs/.rr.yaml create mode 100644 plugins/server/tests/plugin_pipes.go create mode 100644 plugins/server/tests/plugin_sockets.go create mode 100644 plugins/server/tests/plugin_tcp.go create mode 100644 plugins/server/tests/server_test.go create mode 100644 plugins/server/tests/socket.php create mode 100644 plugins/server/tests/tcp.php (limited to 'plugins') diff --git a/plugins/app/config.go b/plugins/app/config.go deleted file mode 100644 index eaa54e2d..00000000 --- a/plugins/app/config.go +++ /dev/null @@ -1,37 +0,0 @@ -package app - -import "time" - -// Config config combines factory, pool and cmd configurations. -type Config struct { - // Command to run as application. - Command string - - // User to run application under. - User string - - // Group to run application under. - Group string - - // Env represents application environment. - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. Defaults to 60s. - RelayTimeout time.Duration -} - -func (cfg *Config) InitDefaults() { - if cfg.Relay == "" { - cfg.Relay = "pipes" - } - - if cfg.RelayTimeout == 0 { - cfg.RelayTimeout = time.Second * 60 - } -} diff --git a/plugins/app/plugin.go b/plugins/app/plugin.go deleted file mode 100644 index ed2880cc..00000000 --- a/plugins/app/plugin.go +++ /dev/null @@ -1,180 +0,0 @@ -package app - -import ( - "context" - "fmt" - "os" - "os/exec" - "strings" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/interfaces/log" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/util" -) - -const ServiceName = "app" - -type Env map[string]string - -// WorkerFactory creates workers for the application. -type WorkerFactory interface { - CmdFactory(env Env) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) -} - -// Plugin manages worker -type Plugin struct { - cfg Config - log log.Logger - factory roadrunner.Factory -} - -// Init application provider. -func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error { - const op = errors.Op("Init") - err := cfg.UnmarshalKey(ServiceName, &app.cfg) - if err != nil { - return errors.E(op, errors.Init, err) - } - app.cfg.InitDefaults() - app.log = log - - return nil -} - -// Name contains service name. -func (app *Plugin) Name() string { - return ServiceName -} - -func (app *Plugin) Serve() chan error { - errCh := make(chan error, 1) - var err error - - app.factory, err = app.initFactory() - if err != nil { - errCh <- errors.E(errors.Op("init factory"), err) - } - - return errCh -} - -func (app *Plugin) Stop() error { - if app.factory == nil { - return nil - } - - return app.factory.Close(context.Background()) -} - -// CmdFactory provides worker command factory assocated with given context. -func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { - var cmdArgs []string - - // create command according to the config - cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) - - return func() *exec.Cmd { - cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) - util.IsolateProcess(cmd) - - // if user is not empty, and OS is linux or macos - // execute php worker from that particular user - if app.cfg.User != "" { - err := util.ExecuteFromUser(cmd, app.cfg.User) - if err != nil { - return nil - } - } - - cmd.Env = app.setEnv(env) - - return cmd - }, nil -} - -// NewWorker issues new standalone worker. -func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { - const op = errors.Op("new worker") - spawnCmd, err := app.CmdFactory(env) - if err != nil { - return nil, errors.E(op, err) - } - - w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) - if err != nil { - return nil, errors.E(op, err) - } - - w.AddListener(app.collectLogs) - - return w, nil -} - -// NewWorkerPool issues new worker pool. -func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { - spawnCmd, err := app.CmdFactory(env) - if err != nil { - return nil, err - } - - p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) - if err != nil { - return nil, err - } - - p.AddListener(app.collectLogs) - - return p, nil -} - -// creates relay and worker factory. -func (app *Plugin) initFactory() (roadrunner.Factory, error) { - const op = errors.Op("network factory init") - if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { - return roadrunner.NewPipeFactory(), nil - } - - dsn := strings.Split(app.cfg.Relay, "://") - if len(dsn) != 2 { - return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) - } - - lsn, err := util.CreateListener(app.cfg.Relay) - if err != nil { - return nil, errors.E(op, errors.Network, err) - } - - switch dsn[0] { - // sockets group - case "unix": - return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil - case "tcp": - return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil - default: - return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) - } -} - -func (app *Plugin) setEnv(e Env) []string { - env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) - for k, v := range e { - env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) - } - - return env -} - -func (app *Plugin) collectLogs(event interface{}) { - if we, ok := event.(roadrunner.WorkerEvent); ok { - switch we.Event { - case roadrunner.EventWorkerError: - app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) - case roadrunner.EventWorkerLog: - app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) - } - } -} diff --git a/plugins/app/tests/app_test.go b/plugins/app/tests/app_test.go deleted file mode 100644 index 3c416b59..00000000 --- a/plugins/app/tests/app_test.go +++ /dev/null @@ -1,358 +0,0 @@ -package tests - -import ( - "os" - "os/signal" - "testing" - "time" - - "github.com/spiral/endure" - "github.com/spiral/roadrunner/v2/plugins/app" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/stretchr/testify/assert" -) - -func TestAppPipes(t *testing.T) { - container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) - if err != nil { - t.Fatal(err) - } - // config plugin - vp := &config.Viper{} - vp.Path = "configs/.rr.yaml" - vp.Prefix = "rr" - err = container.Register(vp) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&app.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&Foo{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - - err = container.Init() - if err != nil { - t.Fatal(err) - } - - errCh, err := container.Serve() - if err != nil { - t.Fatal(err) - } - - // stop by CTRL+C - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - - // stop after 10 seconds - tt := time.NewTicker(time.Second * 10) - - for { - select { - case e := <-errCh: - assert.NoError(t, e.Error) - assert.NoError(t, container.Stop()) - return - case <-c: - er := container.Stop() - if er != nil { - panic(er) - } - return - case <-tt.C: - tt.Stop() - assert.NoError(t, container.Stop()) - return - } - } -} - -func TestAppSockets(t *testing.T) { - container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) - if err != nil { - t.Fatal(err) - } - // config plugin - vp := &config.Viper{} - vp.Path = "configs/.rr-sockets.yaml" - vp.Prefix = "rr" - err = container.Register(vp) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&app.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&Foo2{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - - err = container.Init() - if err != nil { - t.Fatal(err) - } - - errCh, err := container.Serve() - if err != nil { - t.Fatal(err) - } - - // stop by CTRL+C - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - - // stop after 10 seconds - tt := time.NewTicker(time.Second * 10) - - for { - select { - case e := <-errCh: - assert.NoError(t, e.Error) - assert.NoError(t, container.Stop()) - return - case <-c: - er := container.Stop() - if er != nil { - panic(er) - } - return - case <-tt.C: - tt.Stop() - assert.NoError(t, container.Stop()) - return - } - } -} - -func TestAppTCP(t *testing.T) { - container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) - if err != nil { - t.Fatal(err) - } - // config plugin - vp := &config.Viper{} - vp.Path = "configs/.rr-tcp.yaml" - vp.Prefix = "rr" - err = container.Register(vp) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&app.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&Foo3{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - - err = container.Init() - if err != nil { - t.Fatal(err) - } - - errCh, err := container.Serve() - if err != nil { - t.Fatal(err) - } - - // stop by CTRL+C - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - - // stop after 10 seconds - tt := time.NewTicker(time.Second * 10) - - for { - select { - case e := <-errCh: - assert.NoError(t, e.Error) - assert.NoError(t, container.Stop()) - return - case <-c: - er := container.Stop() - if er != nil { - panic(er) - } - return - case <-tt.C: - tt.Stop() - assert.NoError(t, container.Stop()) - return - } - } -} - -func TestAppWrongConfig(t *testing.T) { - container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) - if err != nil { - t.Fatal(err) - } - // config plugin - vp := &config.Viper{} - vp.Path = "configs/.rrrrrrrrrr.yaml" - vp.Prefix = "rr" - err = container.Register(vp) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&app.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&Foo3{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, container.Init()) -} - -func TestAppWrongRelay(t *testing.T) { - container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) - if err != nil { - t.Fatal(err) - } - // config plugin - vp := &config.Viper{} - vp.Path = "configs/.rr-wrong-relay.yaml" - vp.Prefix = "rr" - err = container.Register(vp) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&app.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&Foo3{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - - err = container.Init() - if err != nil { - t.Fatal(err) - } - - _, err = container.Serve() - assert.Error(t, err) -} - -func TestAppWrongCommand(t *testing.T) { - container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) - if err != nil { - t.Fatal(err) - } - // config plugin - vp := &config.Viper{} - vp.Path = "configs/.rr-wrong-command.yaml" - vp.Prefix = "rr" - err = container.Register(vp) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&app.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&Foo3{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - - err = container.Init() - if err != nil { - t.Fatal(err) - } - - _, err = container.Serve() - assert.Error(t, err) -} - -func TestAppNoAppSectionInConfig(t *testing.T) { - container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) - if err != nil { - t.Fatal(err) - } - // config plugin - vp := &config.Viper{} - vp.Path = "configs/.rr-wrong-command.yaml" - vp.Prefix = "rr" - err = container.Register(vp) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&app.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&Foo3{}) - if err != nil { - t.Fatal(err) - } - - err = container.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - - err = container.Init() - if err != nil { - t.Fatal(err) - } - - _, err = container.Serve() - assert.Error(t, err) -} diff --git a/plugins/app/tests/configs/.rr-no-app-section.yaml b/plugins/app/tests/configs/.rr-no-app-section.yaml deleted file mode 100644 index d129ae8a..00000000 --- a/plugins/app/tests/configs/.rr-no-app-section.yaml +++ /dev/null @@ -1,9 +0,0 @@ -upp: - command: "php ../../../tests/client.php echo pipes" - user: "" - group: "" - env: - "RR_CONFIG": "/some/place/on/the/C134" - "RR_CONFIG2": "C138" - relay: "pipes" - relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/app/tests/configs/.rr-sockets.yaml b/plugins/app/tests/configs/.rr-sockets.yaml deleted file mode 100644 index 9bd62693..00000000 --- a/plugins/app/tests/configs/.rr-sockets.yaml +++ /dev/null @@ -1,9 +0,0 @@ -app: - command: "php socket.php" - user: "" - group: "" - env: - "RR_CONFIG": "/some/place/on/the/C134" - "RR_CONFIG2": "C138" - relay: "unix://unix.sock" - relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/app/tests/configs/.rr-tcp.yaml b/plugins/app/tests/configs/.rr-tcp.yaml deleted file mode 100644 index c5a26d37..00000000 --- a/plugins/app/tests/configs/.rr-tcp.yaml +++ /dev/null @@ -1,9 +0,0 @@ -app: - command: "php tcp.php" - user: "" - group: "" - env: - "RR_CONFIG": "/some/place/on/the/C134" - "RR_CONFIG2": "C138" - relay: "tcp://localhost:9999" - relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/app/tests/configs/.rr-wrong-command.yaml b/plugins/app/tests/configs/.rr-wrong-command.yaml deleted file mode 100644 index 4bd019d3..00000000 --- a/plugins/app/tests/configs/.rr-wrong-command.yaml +++ /dev/null @@ -1,9 +0,0 @@ -app: - command: "php some_absent_file.php" - user: "" - group: "" - env: - "RR_CONFIG": "/some/place/on/the/C134" - "RR_CONFIG2": "C138" - relay: "pipes" - relayTimeout: "20s" diff --git a/plugins/app/tests/configs/.rr-wrong-relay.yaml b/plugins/app/tests/configs/.rr-wrong-relay.yaml deleted file mode 100644 index d8ffe8f8..00000000 --- a/plugins/app/tests/configs/.rr-wrong-relay.yaml +++ /dev/null @@ -1,9 +0,0 @@ -app: - command: "php ../../../tests/client.php echo pipes" - user: "" - group: "" - env: - "RR_CONFIG": "/some/place/on/the/C134" - "RR_CONFIG2": "C138" - relay: "pupes" - relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/app/tests/configs/.rr.yaml b/plugins/app/tests/configs/.rr.yaml deleted file mode 100644 index 221aff92..00000000 --- a/plugins/app/tests/configs/.rr.yaml +++ /dev/null @@ -1,9 +0,0 @@ -app: - command: "php ../../../tests/client.php echo pipes" - user: "" - group: "" - env: - "RR_CONFIG": "/some/place/on/the/C134" - "RR_CONFIG2": "C138" - relay: "pipes" - relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/app/tests/plugin_pipes.go b/plugins/app/tests/plugin_pipes.go deleted file mode 100644 index fc999718..00000000 --- a/plugins/app/tests/plugin_pipes.go +++ /dev/null @@ -1,130 +0,0 @@ -package tests - -import ( - "context" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/app" - "github.com/spiral/roadrunner/v2/plugins/config" -) - -const ConfigSection = "app" -const Response = "test" - -var testPoolConfig = roadrunner.Config{ - NumWorkers: 10, - MaxJobs: 100, - AllocateTimeout: time.Second * 10, - DestroyTimeout: time.Second * 10, - Supervisor: &roadrunner.SupervisorConfig{ - WatchTick: 60, - TTL: 1000, - IdleTTL: 10, - ExecTTL: 10, - MaxWorkerMemory: 1000, - }, -} - -type Foo struct { - configProvider config.Configurer - wf app.WorkerFactory - pool roadrunner.Pool -} - -func (f *Foo) Init(p config.Configurer, workerFactory app.WorkerFactory) error { - f.configProvider = p - f.wf = workerFactory - return nil -} - -func (f *Foo) Serve() chan error { - const op = errors.Op("serve") - - // test payload for echo - r := roadrunner.Payload{ - Context: nil, - Body: []byte(Response), - } - - errCh := make(chan error, 1) - - conf := &app.Config{} - var err error - err = f.configProvider.UnmarshalKey(ConfigSection, conf) - if err != nil { - errCh <- err - return errCh - } - - // test CMDFactory - cmd, err := f.wf.CmdFactory(nil) - if err != nil { - errCh <- err - return errCh - } - if cmd == nil { - errCh <- errors.E(op, "command is nil") - return errCh - } - - // test worker creation - w, err := f.wf.NewWorker(context.Background(), nil) - if err != nil { - errCh <- err - return errCh - } - - // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) - if err != nil { - errCh <- err - return errCh - } - - rsp, err := sw.Exec(r) - if err != nil { - errCh <- err - return errCh - } - - if string(rsp.Body) != Response { - errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) - return errCh - } - - // should not be errors - err = sw.Stop(context.Background()) - if err != nil { - errCh <- err - return errCh - } - - // test pool - f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil) - if err != nil { - errCh <- err - return errCh - } - - // test pool execution - rsp, err = f.pool.Exec(r) - if err != nil { - errCh <- err - return errCh - } - - // echo of the "test" should be -> test - if string(rsp.Body) != Response { - errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) - return errCh - } - - return errCh -} - -func (f *Foo) Stop() error { - f.pool.Destroy(context.Background()) - return nil -} diff --git a/plugins/app/tests/plugin_sockets.go b/plugins/app/tests/plugin_sockets.go deleted file mode 100644 index 585264f6..00000000 --- a/plugins/app/tests/plugin_sockets.go +++ /dev/null @@ -1,111 +0,0 @@ -package tests - -import ( - "context" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/app" - "github.com/spiral/roadrunner/v2/plugins/config" -) - -type Foo2 struct { - configProvider config.Configurer - wf app.WorkerFactory - pool roadrunner.Pool -} - -func (f *Foo2) Init(p config.Configurer, workerFactory app.WorkerFactory) error { - f.configProvider = p - f.wf = workerFactory - return nil -} - -func (f *Foo2) Serve() chan error { - const op = errors.Op("serve") - var err error - errCh := make(chan error, 1) - conf := &app.Config{} - - // test payload for echo - r := roadrunner.Payload{ - Context: nil, - Body: []byte(Response), - } - - err = f.configProvider.UnmarshalKey(ConfigSection, conf) - if err != nil { - errCh <- err - return errCh - } - - // test CMDFactory - cmd, err := f.wf.CmdFactory(nil) - if err != nil { - errCh <- err - return errCh - } - if cmd == nil { - errCh <- errors.E(op, "command is nil") - return errCh - } - - // test worker creation - w, err := f.wf.NewWorker(context.Background(), nil) - if err != nil { - errCh <- err - return errCh - } - - // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) - if err != nil { - errCh <- err - return errCh - } - - rsp, err := sw.Exec(r) - if err != nil { - errCh <- err - return errCh - } - - if string(rsp.Body) != Response { - errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) - return errCh - } - - // should not be errors - err = sw.Stop(context.Background()) - if err != nil { - errCh <- err - return errCh - } - - // test pool - f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil) - if err != nil { - errCh <- err - return errCh - } - - // test pool execution - rsp, err = f.pool.Exec(r) - if err != nil { - errCh <- err - return errCh - } - - // echo of the "test" should be -> test - if string(rsp.Body) != Response { - errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) - return errCh - } - - return errCh -} - -func (f *Foo2) Stop() error { - f.pool.Destroy(context.Background()) - return nil -} diff --git a/plugins/app/tests/plugin_tcp.go b/plugins/app/tests/plugin_tcp.go deleted file mode 100644 index 6abc533d..00000000 --- a/plugins/app/tests/plugin_tcp.go +++ /dev/null @@ -1,111 +0,0 @@ -package tests - -import ( - "context" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/app" - "github.com/spiral/roadrunner/v2/plugins/config" -) - -type Foo3 struct { - configProvider config.Configurer - wf app.WorkerFactory - pool roadrunner.Pool -} - -func (f *Foo3) Init(p config.Configurer, workerFactory app.WorkerFactory) error { - f.configProvider = p - f.wf = workerFactory - return nil -} - -func (f *Foo3) Serve() chan error { - const op = errors.Op("serve") - var err error - errCh := make(chan error, 1) - conf := &app.Config{} - - // test payload for echo - r := roadrunner.Payload{ - Context: nil, - Body: []byte(Response), - } - - err = f.configProvider.UnmarshalKey(ConfigSection, conf) - if err != nil { - errCh <- err - return errCh - } - - // test CMDFactory - cmd, err := f.wf.CmdFactory(nil) - if err != nil { - errCh <- err - return errCh - } - if cmd == nil { - errCh <- errors.E(op, "command is nil") - return errCh - } - - // test worker creation - w, err := f.wf.NewWorker(context.Background(), nil) - if err != nil { - errCh <- err - return errCh - } - - // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) - if err != nil { - errCh <- err - return errCh - } - - rsp, err := sw.Exec(r) - if err != nil { - errCh <- err - return errCh - } - - if string(rsp.Body) != Response { - errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) - return errCh - } - - // should not be errors - err = sw.Stop(context.Background()) - if err != nil { - errCh <- err - return errCh - } - - // test pool - f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil) - if err != nil { - errCh <- err - return errCh - } - - // test pool execution - rsp, err = f.pool.Exec(r) - if err != nil { - errCh <- err - return errCh - } - - // echo of the "test" should be -> test - if string(rsp.Body) != Response { - errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) - return errCh - } - - return errCh -} - -func (f *Foo3) Stop() error { - f.pool.Destroy(context.Background()) - return nil -} diff --git a/plugins/app/tests/socket.php b/plugins/app/tests/socket.php deleted file mode 100644 index 143c3ce4..00000000 --- a/plugins/app/tests/socket.php +++ /dev/null @@ -1,25 +0,0 @@ -receive($ctx)) { - try { - $rr->send((string)$in); - } catch (\Throwable $e) { - $rr->error((string)$e); - } -} diff --git a/plugins/app/tests/tcp.php b/plugins/app/tests/tcp.php deleted file mode 100644 index 2d6fb00a..00000000 --- a/plugins/app/tests/tcp.php +++ /dev/null @@ -1,20 +0,0 @@ -receive($ctx)) { - try { - $rr->send((string)$in); - } catch (\Throwable $e) { - $rr->error((string)$e); - } -} \ No newline at end of file diff --git a/plugins/server/config.go b/plugins/server/config.go new file mode 100644 index 00000000..147ae0f7 --- /dev/null +++ b/plugins/server/config.go @@ -0,0 +1,41 @@ +package server + +import ( + "time" + + "github.com/spiral/roadrunner/v2/interfaces/server" +) + +// Config config combines factory, pool and cmd configurations. +type Config struct { + // Command to run as application. + Command string + + // User to run application under. + User string + + // Group to run application under. + Group string + + // Env represents application environment. + Env server.Env + + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration +} + +func (cfg *Config) InitDefaults() { + if cfg.Relay == "" { + cfg.Relay = "pipes" + } + + if cfg.RelayTimeout == 0 { + cfg.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go new file mode 100644 index 00000000..e096708a --- /dev/null +++ b/plugins/server/plugin.go @@ -0,0 +1,172 @@ +package server + +import ( + "context" + "fmt" + "os" + "os/exec" + "strings" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/util" +) + +const ServiceName = "server" + +// Plugin manages worker +type Plugin struct { + cfg Config + log log.Logger + factory roadrunner.Factory +} + +// Init application provider. +func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error { + const op = errors.Op("Init") + err := cfg.UnmarshalKey(ServiceName, &app.cfg) + if err != nil { + return errors.E(op, errors.Init, err) + } + app.cfg.InitDefaults() + app.log = log + + return nil +} + +// Name contains service name. +func (app *Plugin) Name() string { + return ServiceName +} + +func (app *Plugin) Serve() chan error { + errCh := make(chan error, 1) + var err error + + app.factory, err = app.initFactory() + if err != nil { + errCh <- errors.E(errors.Op("init factory"), err) + } + + return errCh +} + +func (app *Plugin) Stop() error { + if app.factory == nil { + return nil + } + + return app.factory.Close(context.Background()) +} + +// CmdFactory provides worker command factory assocated with given context. +func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { + var cmdArgs []string + + // create command according to the config + cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) + + return func() *exec.Cmd { + cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) + util.IsolateProcess(cmd) + + // if user is not empty, and OS is linux or macos + // execute php worker from that particular user + if app.cfg.User != "" { + err := util.ExecuteFromUser(cmd, app.cfg.User) + if err != nil { + return nil + } + } + + cmd.Env = app.setEnv(env) + + return cmd + }, nil +} + +// NewWorker issues new standalone worker. +func (app *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { + const op = errors.Op("new worker") + spawnCmd, err := app.CmdFactory(env) + if err != nil { + return nil, errors.E(op, err) + } + + w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) + if err != nil { + return nil, errors.E(op, err) + } + + w.AddListener(app.collectLogs) + + return w, nil +} + +// NewWorkerPool issues new worker pool. +func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { + spawnCmd, err := app.CmdFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + if err != nil { + return nil, err + } + + p.AddListener(app.collectLogs) + + return p, nil +} + +// creates relay and worker factory. +func (app *Plugin) initFactory() (roadrunner.Factory, error) { + const op = errors.Op("network factory init") + if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { + return roadrunner.NewPipeFactory(), nil + } + + dsn := strings.Split(app.cfg.Relay, "://") + if len(dsn) != 2 { + return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + } + + lsn, err := util.CreateListener(app.cfg.Relay) + if err != nil { + return nil, errors.E(op, errors.Network, err) + } + + switch dsn[0] { + // sockets group + case "unix": + return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + case "tcp": + return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + default: + return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + } +} + +func (app *Plugin) setEnv(e server.Env) []string { + env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) + for k, v := range e { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + + return env +} + +func (app *Plugin) collectLogs(event interface{}) { + if we, ok := event.(roadrunner.WorkerEvent); ok { + switch we.Event { + case roadrunner.EventWorkerError: + app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) + case roadrunner.EventWorkerLog: + app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) + } + } +} diff --git a/plugins/server/tests/configs/.rr-no-app-section.yaml b/plugins/server/tests/configs/.rr-no-app-section.yaml new file mode 100644 index 00000000..b6e3ea93 --- /dev/null +++ b/plugins/server/tests/configs/.rr-no-app-section.yaml @@ -0,0 +1,9 @@ +server: + command: "php ../../../tests/client.php echo pipes" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "pipes" + relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/server/tests/configs/.rr-sockets.yaml b/plugins/server/tests/configs/.rr-sockets.yaml new file mode 100644 index 00000000..ab1239aa --- /dev/null +++ b/plugins/server/tests/configs/.rr-sockets.yaml @@ -0,0 +1,9 @@ +server: + command: "php socket.php" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "unix://unix.sock" + relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/server/tests/configs/.rr-tcp.yaml b/plugins/server/tests/configs/.rr-tcp.yaml new file mode 100644 index 00000000..f53bffcc --- /dev/null +++ b/plugins/server/tests/configs/.rr-tcp.yaml @@ -0,0 +1,9 @@ +server: + command: "php tcp.php" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "tcp://localhost:9999" + relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/server/tests/configs/.rr-wrong-command.yaml b/plugins/server/tests/configs/.rr-wrong-command.yaml new file mode 100644 index 00000000..d2c087a6 --- /dev/null +++ b/plugins/server/tests/configs/.rr-wrong-command.yaml @@ -0,0 +1,9 @@ +server: + command: "php some_absent_file.php" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "pipes" + relayTimeout: "20s" diff --git a/plugins/server/tests/configs/.rr-wrong-relay.yaml b/plugins/server/tests/configs/.rr-wrong-relay.yaml new file mode 100644 index 00000000..1dd73d73 --- /dev/null +++ b/plugins/server/tests/configs/.rr-wrong-relay.yaml @@ -0,0 +1,9 @@ +server: + command: "php ../../../tests/client.php echo pipes" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "pupes" + relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/server/tests/configs/.rr.yaml b/plugins/server/tests/configs/.rr.yaml new file mode 100644 index 00000000..b6e3ea93 --- /dev/null +++ b/plugins/server/tests/configs/.rr.yaml @@ -0,0 +1,9 @@ +server: + command: "php ../../../tests/client.php echo pipes" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "pipes" + relayTimeout: "20s" \ No newline at end of file diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go new file mode 100644 index 00000000..840021eb --- /dev/null +++ b/plugins/server/tests/plugin_pipes.go @@ -0,0 +1,131 @@ +package tests + +import ( + "context" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/plugins/config" + plugin "github.com/spiral/roadrunner/v2/plugins/server" +) + +const ConfigSection = "app" +const Response = "test" + +var testPoolConfig = roadrunner.PoolConfig{ + NumWorkers: 10, + MaxJobs: 100, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + Supervisor: &roadrunner.SupervisorConfig{ + WatchTick: 60, + TTL: 1000, + IdleTTL: 10, + ExecTTL: 10, + MaxWorkerMemory: 1000, + }, +} + +type Foo struct { + configProvider config.Configurer + wf server.WorkerFactory + pool roadrunner.Pool +} + +func (f *Foo) Init(p config.Configurer, workerFactory server.WorkerFactory) error { + f.configProvider = p + f.wf = workerFactory + return nil +} + +func (f *Foo) Serve() chan error { + const op = errors.Op("serve") + + // test payload for echo + r := roadrunner.Payload{ + Context: nil, + Body: []byte(Response), + } + + errCh := make(chan error, 1) + + conf := &plugin.Config{} + var err error + err = f.configProvider.UnmarshalKey(ConfigSection, conf) + if err != nil { + errCh <- err + return errCh + } + + // test CMDFactory + cmd, err := f.wf.CmdFactory(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.E(op, "command is nil") + return errCh + } + + // test worker creation + w, err := f.wf.NewWorker(context.Background(), nil) + if err != nil { + errCh <- err + return errCh + } + + // test that our worker is functional + sw, err := roadrunner.NewSyncWorker(w) + if err != nil { + errCh <- err + return errCh + } + + rsp, err := sw.Exec(r) + if err != nil { + errCh <- err + return errCh + } + + if string(rsp.Body) != Response { + errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) + return errCh + } + + // should not be errors + err = sw.Stop(context.Background()) + if err != nil { + errCh <- err + return errCh + } + + // test pool + f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil) + if err != nil { + errCh <- err + return errCh + } + + // test pool execution + rsp, err = f.pool.Exec(r) + if err != nil { + errCh <- err + return errCh + } + + // echo of the "test" should be -> test + if string(rsp.Body) != Response { + errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) + return errCh + } + + return errCh +} + +func (f *Foo) Stop() error { + f.pool.Destroy(context.Background()) + return nil +} diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go new file mode 100644 index 00000000..b12f4ead --- /dev/null +++ b/plugins/server/tests/plugin_sockets.go @@ -0,0 +1,112 @@ +package tests + +import ( + "context" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/plugins/config" + plugin "github.com/spiral/roadrunner/v2/plugins/server" +) + +type Foo2 struct { + configProvider config.Configurer + wf server.WorkerFactory + pool roadrunner.Pool +} + +func (f *Foo2) Init(p config.Configurer, workerFactory server.WorkerFactory) error { + f.configProvider = p + f.wf = workerFactory + return nil +} + +func (f *Foo2) Serve() chan error { + const op = errors.Op("serve") + var err error + errCh := make(chan error, 1) + conf := &plugin.Config{} + + // test payload for echo + r := roadrunner.Payload{ + Context: nil, + Body: []byte(Response), + } + + err = f.configProvider.UnmarshalKey(ConfigSection, conf) + if err != nil { + errCh <- err + return errCh + } + + // test CMDFactory + cmd, err := f.wf.CmdFactory(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.E(op, "command is nil") + return errCh + } + + // test worker creation + w, err := f.wf.NewWorker(context.Background(), nil) + if err != nil { + errCh <- err + return errCh + } + + // test that our worker is functional + sw, err := roadrunner.NewSyncWorker(w) + if err != nil { + errCh <- err + return errCh + } + + rsp, err := sw.Exec(r) + if err != nil { + errCh <- err + return errCh + } + + if string(rsp.Body) != Response { + errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) + return errCh + } + + // should not be errors + err = sw.Stop(context.Background()) + if err != nil { + errCh <- err + return errCh + } + + // test pool + f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil) + if err != nil { + errCh <- err + return errCh + } + + // test pool execution + rsp, err = f.pool.Exec(r) + if err != nil { + errCh <- err + return errCh + } + + // echo of the "test" should be -> test + if string(rsp.Body) != Response { + errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) + return errCh + } + + return errCh +} + +func (f *Foo2) Stop() error { + f.pool.Destroy(context.Background()) + return nil +} diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go new file mode 100644 index 00000000..39044577 --- /dev/null +++ b/plugins/server/tests/plugin_tcp.go @@ -0,0 +1,112 @@ +package tests + +import ( + "context" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/plugins/config" + plugin "github.com/spiral/roadrunner/v2/plugins/server" +) + +type Foo3 struct { + configProvider config.Configurer + wf server.WorkerFactory + pool roadrunner.Pool +} + +func (f *Foo3) Init(p config.Configurer, workerFactory server.WorkerFactory) error { + f.configProvider = p + f.wf = workerFactory + return nil +} + +func (f *Foo3) Serve() chan error { + const op = errors.Op("serve") + var err error + errCh := make(chan error, 1) + conf := &plugin.Config{} + + // test payload for echo + r := roadrunner.Payload{ + Context: nil, + Body: []byte(Response), + } + + err = f.configProvider.UnmarshalKey(ConfigSection, conf) + if err != nil { + errCh <- err + return errCh + } + + // test CMDFactory + cmd, err := f.wf.CmdFactory(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.E(op, "command is nil") + return errCh + } + + // test worker creation + w, err := f.wf.NewWorker(context.Background(), nil) + if err != nil { + errCh <- err + return errCh + } + + // test that our worker is functional + sw, err := roadrunner.NewSyncWorker(w) + if err != nil { + errCh <- err + return errCh + } + + rsp, err := sw.Exec(r) + if err != nil { + errCh <- err + return errCh + } + + if string(rsp.Body) != Response { + errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) + return errCh + } + + // should not be errors + err = sw.Stop(context.Background()) + if err != nil { + errCh <- err + return errCh + } + + // test pool + f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil) + if err != nil { + errCh <- err + return errCh + } + + // test pool execution + rsp, err = f.pool.Exec(r) + if err != nil { + errCh <- err + return errCh + } + + // echo of the "test" should be -> test + if string(rsp.Body) != Response { + errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) + return errCh + } + + return errCh +} + +func (f *Foo3) Stop() error { + f.pool.Destroy(context.Background()) + return nil +} diff --git a/plugins/server/tests/server_test.go b/plugins/server/tests/server_test.go new file mode 100644 index 00000000..53daa67f --- /dev/null +++ b/plugins/server/tests/server_test.go @@ -0,0 +1,358 @@ +package tests + +import ( + "os" + "os/signal" + "testing" + "time" + + "github.com/spiral/endure" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/stretchr/testify/assert" +) + +func TestAppPipes(t *testing.T) { + container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.Viper{} + vp.Path = "configs/.rr.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&server.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&logger.ZapLogger{}) + if err != nil { + t.Fatal(err) + } + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + errCh, err := container.Serve() + if err != nil { + t.Fatal(err) + } + + // stop by CTRL+C + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + + // stop after 10 seconds + tt := time.NewTicker(time.Second * 10) + + for { + select { + case e := <-errCh: + assert.NoError(t, e.Error) + assert.NoError(t, container.Stop()) + return + case <-c: + er := container.Stop() + if er != nil { + panic(er) + } + return + case <-tt.C: + tt.Stop() + assert.NoError(t, container.Stop()) + return + } + } +} + +func TestAppSockets(t *testing.T) { + container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.Viper{} + vp.Path = "configs/.rr-sockets.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&server.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo2{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&logger.ZapLogger{}) + if err != nil { + t.Fatal(err) + } + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + errCh, err := container.Serve() + if err != nil { + t.Fatal(err) + } + + // stop by CTRL+C + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + + // stop after 10 seconds + tt := time.NewTicker(time.Second * 10) + + for { + select { + case e := <-errCh: + assert.NoError(t, e.Error) + assert.NoError(t, container.Stop()) + return + case <-c: + er := container.Stop() + if er != nil { + panic(er) + } + return + case <-tt.C: + tt.Stop() + assert.NoError(t, container.Stop()) + return + } + } +} + +func TestAppTCP(t *testing.T) { + container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.Viper{} + vp.Path = "configs/.rr-tcp.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&server.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo3{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&logger.ZapLogger{}) + if err != nil { + t.Fatal(err) + } + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + errCh, err := container.Serve() + if err != nil { + t.Fatal(err) + } + + // stop by CTRL+C + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + + // stop after 10 seconds + tt := time.NewTicker(time.Second * 10) + + for { + select { + case e := <-errCh: + assert.NoError(t, e.Error) + assert.NoError(t, container.Stop()) + return + case <-c: + er := container.Stop() + if er != nil { + panic(er) + } + return + case <-tt.C: + tt.Stop() + assert.NoError(t, container.Stop()) + return + } + } +} + +func TestAppWrongConfig(t *testing.T) { + container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.Viper{} + vp.Path = "configs/.rrrrrrrrrr.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&server.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo3{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&logger.ZapLogger{}) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, container.Init()) +} + +func TestAppWrongRelay(t *testing.T) { + container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.Viper{} + vp.Path = "configs/.rr-wrong-relay.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&server.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo3{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&logger.ZapLogger{}) + if err != nil { + t.Fatal(err) + } + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + _, err = container.Serve() + assert.Error(t, err) +} + +func TestAppWrongCommand(t *testing.T) { + container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.Viper{} + vp.Path = "configs/.rr-wrong-command.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&server.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo3{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&logger.ZapLogger{}) + if err != nil { + t.Fatal(err) + } + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + _, err = container.Serve() + assert.Error(t, err) +} + +func TestAppNoAppSectionInConfig(t *testing.T) { + container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.Viper{} + vp.Path = "configs/.rr-wrong-command.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&server.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo3{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&logger.ZapLogger{}) + if err != nil { + t.Fatal(err) + } + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + _, err = container.Serve() + assert.Error(t, err) +} diff --git a/plugins/server/tests/socket.php b/plugins/server/tests/socket.php new file mode 100644 index 00000000..143c3ce4 --- /dev/null +++ b/plugins/server/tests/socket.php @@ -0,0 +1,25 @@ +receive($ctx)) { + try { + $rr->send((string)$in); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +} diff --git a/plugins/server/tests/tcp.php b/plugins/server/tests/tcp.php new file mode 100644 index 00000000..2d6fb00a --- /dev/null +++ b/plugins/server/tests/tcp.php @@ -0,0 +1,20 @@ +receive($ctx)) { + try { + $rr->send((string)$in); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +} \ No newline at end of file -- cgit v1.2.3