diff options
author | Valery Piashchynski <[email protected]> | 2020-11-16 14:21:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-11-16 14:21:35 +0300 |
commit | 57ad958acab2d108be0a35547faf6e7a791cf069 (patch) | |
tree | 3c519708540265a7e74b03d1fe17ef4101ffcfdf | |
parent | d890eee1742dc3a0a1c787f7e65d40b1e81a94db (diff) |
Update Pool
-rw-r--r-- | interfaces/app/interface.go | 15 | ||||
-rw-r--r-- | plugins/app/plugin.go | 17 | ||||
-rw-r--r-- | plugins/app/tests/plugin_pipes.go | 2 | ||||
-rw-r--r-- | plugins/http/attributes/attributes_test.go | 3 | ||||
-rw-r--r-- | plugins/http/config.go | 31 | ||||
-rw-r--r-- | plugins/http/config_test.go | 656 | ||||
-rw-r--r-- | plugins/http/handler.go | 6 | ||||
-rw-r--r-- | plugins/http/plugin.go | 97 | ||||
-rw-r--r-- | plugins/http/request.go | 28 | ||||
-rw-r--r-- | plugins/http/response.go | 2 | ||||
-rw-r--r-- | plugins/http/test/http_test.go | 23 | ||||
-rw-r--r-- | plugins/http/test/rr-http.yaml | 67 | ||||
-rw-r--r-- | plugins/http/uploads.go | 8 | ||||
-rwxr-xr-x | pool.go | 6 | ||||
-rwxr-xr-x | static_pool.go | 12 | ||||
-rwxr-xr-x | static_pool_test.go | 24 | ||||
-rwxr-xr-x | supervisor_pool.go | 2 | ||||
-rw-r--r-- | supervisor_test.go | 6 |
18 files changed, 577 insertions, 428 deletions
diff --git a/interfaces/app/interface.go b/interfaces/app/interface.go new file mode 100644 index 00000000..4db7a094 --- /dev/null +++ b/interfaces/app/interface.go @@ -0,0 +1,15 @@ +package app + +import ( + "context" + "os/exec" + + "github.com/spiral/roadrunner/v2" +) + +// WorkerFactory creates workers for the application. +type WorkerFactory interface { + CmdFactory(env map[string]string) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env map[string]string) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env map[string]string) (roadrunner.Pool, error) +} diff --git a/plugins/app/plugin.go b/plugins/app/plugin.go index ed2880cc..fbb53c3d 100644 --- a/plugins/app/plugin.go +++ b/plugins/app/plugin.go @@ -16,15 +16,6 @@ import ( const ServiceName = "app" -type Env map[string]string - -// WorkerFactory creates workers for the application. -type WorkerFactory interface { - CmdFactory(env Env) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) -} - // Plugin manages worker type Plugin struct { cfg Config @@ -71,7 +62,7 @@ func (app *Plugin) Stop() error { } // CmdFactory provides worker command factory assocated with given context. -func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { +func (app *Plugin) CmdFactory(env map[string]string) (func() *exec.Cmd, error) { var cmdArgs []string // create command according to the config @@ -97,7 +88,7 @@ func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { } // NewWorker issues new standalone worker. -func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { +func (app *Plugin) NewWorker(ctx context.Context, env map[string]string) (roadrunner.WorkerBase, error) { const op = errors.Op("new worker") spawnCmd, err := app.CmdFactory(env) if err != nil { @@ -115,7 +106,7 @@ func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBas } // NewWorkerPool issues new worker pool. -func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { +func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env map[string]string) (roadrunner.Pool, error) { spawnCmd, err := app.CmdFactory(env) if err != nil { return nil, err @@ -159,7 +150,7 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) { } } -func (app *Plugin) setEnv(e Env) []string { +func (app *Plugin) setEnv(e map[string]string) []string { env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) for k, v := range e { env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) diff --git a/plugins/app/tests/plugin_pipes.go b/plugins/app/tests/plugin_pipes.go index fc999718..150ab297 100644 --- a/plugins/app/tests/plugin_pipes.go +++ b/plugins/app/tests/plugin_pipes.go @@ -13,7 +13,7 @@ import ( const ConfigSection = "app" const Response = "test" -var testPoolConfig = roadrunner.Config{ +var testPoolConfig = roadrunner.PoolConfig{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/plugins/http/attributes/attributes_test.go b/plugins/http/attributes/attributes_test.go index 2360fd12..d914f6fa 100644 --- a/plugins/http/attributes/attributes_test.go +++ b/plugins/http/attributes/attributes_test.go @@ -1,9 +1,10 @@ package attributes import ( - "github.com/stretchr/testify/assert" "net/http" "testing" + + "github.com/stretchr/testify/assert" ) func TestAllAttributes(t *testing.T) { diff --git a/plugins/http/config.go b/plugins/http/config.go index e91133ef..b3d16b62 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -6,8 +6,37 @@ import ( "net" "os" "strings" + "time" + + "github.com/spiral/roadrunner/v2" ) +type PoolConfig struct { +} + +type ServerConfig struct { + // Command includes command strings with all the parameters, example: "php worker.php pipes". + Command string + + // User under which process will be started + User string + + // Relay defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. + RelayTimeout time.Duration + + // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change + // while server is running. + PoolCfg *roadrunner.PoolConfig + + env map[string]string +} + // Config configures RoadRunner HTTP server. type Config struct { // Port and port to handle as http server. @@ -33,7 +62,7 @@ type Config struct { Uploads *UploadsConfig // Workers configures rr server and worker pool. - Workers *roadrunner.ServerConfig + Workers *ServerConfig } // FCGIConfig for FastCGI server. diff --git a/plugins/http/config_test.go b/plugins/http/config_test.go index 96fbb954..6d23b6ca 100644 --- a/plugins/http/config_test.go +++ b/plugins/http/config_test.go @@ -1,330 +1,330 @@ package http -import ( - "os" - "testing" - "time" - - json "github.com/json-iterator/go" - "github.com/stretchr/testify/assert" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { - j := json.ConfigCompatibleWithStandardLibrary - return j.Unmarshal([]byte(cfg.cfg), out) -} - -func Test_Config_Hydrate_Error1(t *testing.T) { - cfg := &mockCfg{`{"address": "localhost:8080"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{"dir": "/dir/"`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Valid(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.NoError(t, cfg.Valid()) -} - -func Test_Trusted_Subnets(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - TrustedSubnets: []string{"200.1.0.0/16"}, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.NoError(t, cfg.parseCIDRs()) - - assert.True(t, cfg.IsTrusted("200.1.0.10")) - assert.False(t, cfg.IsTrusted("127.0.0.0.1")) -} - -func Test_Trusted_Subnets_Err(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - TrustedSubnets: []string{"200.1.0.0"}, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.parseCIDRs()) -} - -func Test_Config_Valid_SSL(t *testing.T) { - cfg := &Config{ - Address: ":8080", - SSL: SSLConfig{ - Cert: "fixtures/server.crt", - Key: "fixtures/server.key", - }, - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Hydrate(&testCfg{httpCfg: "{}"})) - - assert.NoError(t, cfg.Valid()) - assert.True(t, cfg.EnableTLS()) - assert.Equal(t, 443, cfg.SSL.Port) -} - -func Test_Config_SSL_No_key(t *testing.T) { - cfg := &Config{ - Address: ":8080", - SSL: SSLConfig{ - Cert: "fixtures/server.crt", - }, - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_SSL_No_Cert(t *testing.T) { - cfg := &Config{ - Address: ":8080", - SSL: SSLConfig{ - Key: "fixtures/server.key", - }, - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_NoUploads(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_NoHTTP2(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 0, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_NoWorkers(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_NoPool(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 0, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_DeadPool(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_InvalidAddress(t *testing.T) { - cfg := &Config{ - Address: "unexpected_address", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} +//import ( +// "os" +// "testing" +// "time" +// +// json "github.com/json-iterator/go" +// "github.com/stretchr/testify/assert" +//) +// +//type mockCfg struct{ cfg string } +// +//func (cfg *mockCfg) Get(name string) service.Config { return nil } +//func (cfg *mockCfg) Unmarshal(out interface{}) error { +// j := json.ConfigCompatibleWithStandardLibrary +// return j.Unmarshal([]byte(cfg.cfg), out) +//} +// +//func Test_Config_Hydrate_Error1(t *testing.T) { +// cfg := &mockCfg{`{"address": "localhost:8080"}`} +// c := &Config{} +// +// assert.NoError(t, c.Hydrate(cfg)) +//} +// +//func Test_Config_Hydrate_Error2(t *testing.T) { +// cfg := &mockCfg{`{"dir": "/dir/"`} +// c := &Config{} +// +// assert.Error(t, c.Hydrate(cfg)) +//} +// +//func Test_Config_Valid(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.NoError(t, cfg.Valid()) +//} +// +//func Test_Trusted_Subnets(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// TrustedSubnets: []string{"200.1.0.0/16"}, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.NoError(t, cfg.parseCIDRs()) +// +// assert.True(t, cfg.IsTrusted("200.1.0.10")) +// assert.False(t, cfg.IsTrusted("127.0.0.0.1")) +//} +// +//func Test_Trusted_Subnets_Err(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// TrustedSubnets: []string{"200.1.0.0"}, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.parseCIDRs()) +//} +// +//func Test_Config_Valid_SSL(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// SSL: SSLConfig{ +// Cert: "fixtures/server.crt", +// Key: "fixtures/server.key", +// }, +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Hydrate(&testCfg{httpCfg: "{}"})) +// +// assert.NoError(t, cfg.Valid()) +// assert.True(t, cfg.EnableTLS()) +// assert.Equal(t, 443, cfg.SSL.Port) +//} +// +//func Test_Config_SSL_No_key(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// SSL: SSLConfig{ +// Cert: "fixtures/server.crt", +// }, +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_SSL_No_Cert(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// SSL: SSLConfig{ +// Key: "fixtures/server.key", +// }, +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoUploads(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoHTTP2(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 0, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoWorkers(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoPool(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 0, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_DeadPool(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_InvalidAddress(t *testing.T) { +// cfg := &Config{ +// Address: "unexpected_address", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} diff --git a/plugins/http/handler.go b/plugins/http/handler.go index 2bda4f1d..5b612d7e 100644 --- a/plugins/http/handler.go +++ b/plugins/http/handler.go @@ -10,6 +10,8 @@ import ( "time" "github.com/pkg/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/log" ) const ( @@ -60,8 +62,8 @@ func (e *ResponseEvent) Elapsed() time.Duration { // parsed files and query, payload will include parsed form dataTree (if any). type Handler struct { cfg *Config - log *logrus.Logger - rr *roadrunner.Server + log log.Logger + rr roadrunner.Pool mul sync.Mutex lsn func(event int, ctx interface{}) } diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 7c46c814..24eaa68c 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "crypto/x509" - "errors" "fmt" "io/ioutil" "net/http" @@ -13,9 +12,11 @@ import ( "strings" "sync" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + factory "github.com/spiral/roadrunner/v2/interfaces/app" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/rpc" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "golang.org/x/sys/cpu" @@ -23,64 +24,85 @@ import ( const ( // ID contains default service name. - ID = "http" + ServiceName = "http" // EventInitSSL thrown at moment of https initialization. SSL server passed as context. EventInitSSL = 750 ) -var couldNotAppendPemError = errors.New("could not append Certs from PEM") +//var couldNotAppendPemError = errors.New("could not append Certs from PEM") // http middleware type. type middleware func(f http.HandlerFunc) http.HandlerFunc // Service manages rr, http servers. -type Service struct { +type Plugin struct { sync.Mutex sync.WaitGroup - cfg *Config - log *logrus.Logger - cprod roadrunner.CommandProducer - env env.Environment - lsns []func(event int, ctx interface{}) - mdwr []middleware + cfg *Config + log log.Logger - rr *roadrunner.Server - controller roadrunner.Controller - handler *Handler + //cprod roadrunner.CommandProducer + env map[string]string + lsns []func(event int, ctx interface{}) + mdwr []middleware + + rr roadrunner.Pool + //controller roadrunner.Controller + handler *Handler http *http.Server https *http.Server fcgi *http.Server } -// Attach attaches controller. Currently only one controller is supported. -func (s *Service) Attach(w roadrunner.Controller) { - s.controller = w -} - -// ProduceCommands changes the default command generator method -func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) { - s.cprod = producer -} +//// Attach attaches controller. Currently only one controller is supported. +//func (s *Service) Attach(w roadrunner.Controller) { +// s.controller = w +//} +// +//// ProduceCommands changes the default command generator method +//func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) { +// s.cprod = producer +//} // AddMiddleware adds new net/http mdwr. -func (s *Service) AddMiddleware(m middleware) { +func (s *Plugin) AddMiddleware(m middleware) { s.mdwr = append(s.mdwr, m) } // AddListener attaches server event controller. -func (s *Service) AddListener(l func(event int, ctx interface{})) { +func (s *Plugin) AddListener(l func(event int, ctx interface{})) { s.lsns = append(s.lsns, l) } // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg config.Configurer, r rpc.Plugin, log log.Logger) (bool, error) { - s.cfg = cfg +func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerFactory) error { + const op = errors.Op("http Init") + err := cfg.UnmarshalKey(ServiceName, &s.cfg) + if err != nil { + return errors.E(op, err) + } + s.log = log + p, err := app.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + Debug: s.cfg.Workers.PoolCfg.Debug, + NumWorkers: s.cfg.Workers.PoolCfg.NumWorkers, + MaxJobs: s.cfg.Workers.PoolCfg.MaxJobs, + AllocateTimeout: s.cfg.Workers.PoolCfg.AllocateTimeout, + DestroyTimeout: s.cfg.Workers.PoolCfg.DestroyTimeout, + Supervisor: nil, + }, nil) + + if err != nil { + return errors.E(op, err) + } + + s.rr = p + //if r != nil { // if err := r.Register(ID, &rpcServer{s}); err != nil { // return false, err @@ -91,11 +113,11 @@ func (s *Service) Init(cfg config.Configurer, r rpc.Plugin, log log.Logger) (boo // return false, nil //} - return true, nil + return nil } // Serve serves the svc. -func (s *Service) Serve() error { +func (s *Plugin) Serve() error { s.Lock() if s.env != nil { @@ -190,11 +212,12 @@ func (s *Service) Serve() error { err <- nil }() } + return <-err } // Stop stops the http. -func (s *Service) Stop() { +func (s *Plugin) Stop() { s.Lock() defer s.Unlock() @@ -240,7 +263,7 @@ func (s *Service) Stop() { } // Server returns associated rr server (if any). -func (s *Service) Server() *roadrunner.Server { +func (s *Plugin) Server() *roadrunner.Server { s.Lock() defer s.Unlock() @@ -276,7 +299,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // append RootCA to the https server TLS config -func (s *Service) appendRootCa() error { +func (s *Plugin) appendRootCa() error { rootCAs, err := x509.SystemCertPool() if err != nil { s.throw(EventInitSSL, nil) @@ -307,7 +330,7 @@ func (s *Service) appendRootCa() error { } // Init https server -func (s *Service) initSSL() *http.Server { +func (s *Plugin) initSSL() *http.Server { var topCipherSuites []uint16 var defaultCipherSuitesTLS13 []uint16 @@ -377,14 +400,14 @@ func (s *Service) initSSL() *http.Server { } // init http/2 server -func (s *Service) initHTTP2() error { +func (s *Plugin) initHTTP2() error { return http2.ConfigureServer(s.https, &http2.Server{ MaxConcurrentStreams: s.cfg.HTTP2.MaxConcurrentStreams, }) } // serveFCGI starts FastCGI server. -func (s *Service) serveFCGI() error { +func (s *Plugin) serveFCGI() error { l, err := util.CreateListener(s.cfg.FCGI.Address) if err != nil { return err @@ -399,7 +422,7 @@ func (s *Service) serveFCGI() error { } // throw handles service, server and pool events. -func (s *Service) throw(event int, ctx interface{}) { +func (s *Plugin) throw(event int, ctx interface{}) { for _, l := range s.lsns { l(event, ctx) } @@ -411,7 +434,7 @@ func (s *Service) throw(event int, ctx interface{}) { } // tlsAddr replaces listen or host port with port configured by SSL config. -func (s *Service) tlsAddr(host string, forcePort bool) string { +func (s *Plugin) tlsAddr(host string, forcePort bool) string { // remove current forcePort first host = strings.Split(host, ":")[0] diff --git a/plugins/http/request.go b/plugins/http/request.go index b3123eb2..7e9839b2 100644 --- a/plugins/http/request.go +++ b/plugins/http/request.go @@ -9,6 +9,7 @@ import ( "strings" json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/log" ) @@ -66,8 +67,8 @@ func fetchIP(pair string) string { } // NewRequest creates new PSR7 compatible request using net/http request. -func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { - req = &Request{ +func NewRequest(r *http.Request, cfg *UploadsConfig) (*Request, error) { + req := &Request{ RemoteAddr: fetchIP(r.RemoteAddr), Protocol: r.Proto, Method: r.Method, @@ -75,7 +76,7 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { Header: r.Header, Cookies: make(map[string]string), RawQuery: r.URL.RawQuery, - Attributes: attributes.All(r), + //Attributes: attributes.All(r), } for _, c := range r.Cookies() { @@ -89,18 +90,19 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { return req, nil case contentStream: + var err error req.body, err = ioutil.ReadAll(r.Body) return req, err case contentMultipart: - if err = r.ParseMultipartForm(defaultMaxMemory); err != nil { + if err := r.ParseMultipartForm(defaultMaxMemory); err != nil { return nil, err } req.Uploads = parseUploads(r, cfg) fallthrough case contentFormData: - if err = r.ParseForm(); err != nil { + if err := r.ParseForm(); err != nil { return nil, err } @@ -121,7 +123,7 @@ func (r *Request) Open(log log.Logger) { } // Close clears all temp file uploads -func (r *Request) Close(log *logrus.Logger) { +func (r *Request) Close(log log.Logger) { if r.Uploads == nil { return } @@ -131,17 +133,17 @@ func (r *Request) Close(log *logrus.Logger) { // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open // files prior to calling this method. -func (r *Request) Payload() (p *roadrunner.Payload, err error) { - p = &roadrunner.Payload{} +func (r *Request) Payload() (roadrunner.Payload, error) { + p := roadrunner.Payload{} - j := json.ConfigCompatibleWithStandardLibrary - if p.Context, err = j.Marshal(r); err != nil { - return nil, err + var err error + if p.Context, err = json.Marshal(r); err != nil { + return roadrunner.EmptyPayload, err } if r.Parsed { - if p.Body, err = j.Marshal(r.body); err != nil { - return nil, err + if p.Body, err = json.Marshal(r.body); err != nil { + return roadrunner.EmptyPayload, err } } else if r.body != nil { p.Body = r.body.([]byte) diff --git a/plugins/http/response.go b/plugins/http/response.go index f34754be..88848b9d 100644 --- a/plugins/http/response.go +++ b/plugins/http/response.go @@ -6,8 +6,6 @@ import ( "strings" json "github.com/json-iterator/go" - - "github.com/spiral/roadrunner" ) diff --git a/plugins/http/test/http_test.go b/plugins/http/test/http_test.go new file mode 100644 index 00000000..c109d930 --- /dev/null +++ b/plugins/http/test/http_test.go @@ -0,0 +1,23 @@ +package test + +import ( + "testing" + + "github.com/spiral/endure" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/stretchr/testify/assert" +) + +func TestHTTPInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, "")) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: ".rr-http.yaml", + Prefix: "", + } + err = cont.RegisterAll( + + ) + assert.NoError(t, err) +} diff --git a/plugins/http/test/rr-http.yaml b/plugins/http/test/rr-http.yaml new file mode 100644 index 00000000..8a70ce1c --- /dev/null +++ b/plugins/http/test/rr-http.yaml @@ -0,0 +1,67 @@ +http: + address: 0.0.0.0:8080 + maxRequestSize: 200 + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + + ssl: + port: 443 + redirect: true + cert: server.crt + key: server.key + rootCa: root.crt + fcgi: + address: tcp://0.0.0.0:6920 + http2: + enabled: true + h2c: true + maxConcurrentStreams: 128 + + workers: + command: "php psr-worker.php pipes" + user: "" + + # connection method (pipes, tcp://:9000, unix://socket.unix). default "pipes" + relay: "pipes" + + pool: + numWorkers: 4 + maxJobs: 0 + allocateTimeout: 60 + destroyTimeout: 60 + + +transport: + http: + address: 0.0.0.0:8080 + maxRequestSize: 200 + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + + ssl: + port: 443 + redirect: true + cert: server.crt + key: server.key + rootCa: root.crt + fcgi: + address: tcp://0.0.0.0:6920 + http2: + enabled: true + h2c: true + maxConcurrentStreams: 128 + + workers: + command: "php psr-worker.php pipes" + user: "" + + # connection method (pipes, tcp://:9000, unix://socket.unix). default "pipes" + relay: "pipes" + + pool: + numWorkers: 4 + maxJobs: 0 + allocateTimeout: 60 + destroyTimeout: 60
\ No newline at end of file diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go index 99427b69..2a1524ef 100644 --- a/plugins/http/uploads.go +++ b/plugins/http/uploads.go @@ -1,8 +1,6 @@ package http import ( - "fmt" - json "github.com/json-iterator/go" "github.com/spiral/roadrunner/v2/interfaces/log" @@ -58,7 +56,7 @@ func (u *Uploads) Open(log log.Logger) { defer wg.Done() err := f.Open(u.cfg) if err != nil && log != nil { - log.Error(fmt.Errorf("error opening the file: error %v", err)) + log.Error("error opening the file", "err", err) } }(f) } @@ -67,12 +65,12 @@ func (u *Uploads) Open(log log.Logger) { } // Clear deletes all temporary files. -func (u *Uploads) Clear(log *logrus.Logger) { +func (u *Uploads) Clear(log log.Logger) { for _, f := range u.list { if f.TempFilename != "" && exists(f.TempFilename) { err := os.Remove(f.TempFilename) if err != nil && log != nil { - log.Error(fmt.Errorf("error removing the file: error %v", err)) + log.Error("error removing the file", "err", err) } } } @@ -49,7 +49,7 @@ type Pool interface { AddListener(listener util.EventListener) // GetConfig returns pool configuration. - GetConfig() Config + GetConfig() PoolConfig // Exec Exec(rqs Payload) (Payload, error) @@ -67,7 +67,7 @@ type Pool interface { } // Configures the pool behaviour. -type Config struct { +type PoolConfig struct { // Debug flag creates new fresh worker before every request. Debug bool @@ -93,7 +93,7 @@ type Config struct { } // InitDefaults enables default config values. -func (cfg *Config) InitDefaults() { +func (cfg *PoolConfig) InitDefaults() { if cfg.NumWorkers == 0 { cfg.NumWorkers = int64(runtime.NumCPU()) } diff --git a/static_pool.go b/static_pool.go index 0e5ee050..4e49b212 100755 --- a/static_pool.go +++ b/static_pool.go @@ -29,7 +29,7 @@ type PoolOptions func(p *StaticPool) // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - cfg Config + cfg PoolConfig // worker command creator cmd func() *exec.Cmd @@ -52,7 +52,7 @@ type StaticPool struct { } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config, options ...PoolOptions) (Pool, error) { +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg PoolConfig, options ...PoolOptions) (Pool, error) { const op = errors.Op("NewPool") cfg.InitDefaults() @@ -75,13 +75,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) if err != nil { - return nil, err + return nil, errors.E(op, err) } // put stack in the pool err = p.ww.AddToWatch(ctx, workers) if err != nil { - return nil, err + return nil, errors.E(op, err) } p.errEncoder = defaultErrEncoder(p) @@ -119,8 +119,8 @@ func (sp *StaticPool) AddListener(listener util.EventListener) { sp.events.AddListener(listener) } -// Config returns associated pool configuration. Immutable. -func (sp *StaticPool) GetConfig() Config { +// PoolConfig returns associated pool configuration. Immutable. +func (sp *StaticPool) GetConfig() PoolConfig { return sp.cfg } diff --git a/static_pool_test.go b/static_pool_test.go index d661c34d..27907af5 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/assert" ) -var cfg = Config{ +var cfg = PoolConfig{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -52,7 +52,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -252,7 +252,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, @@ -268,7 +268,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -305,7 +305,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -345,7 +345,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -385,7 +385,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -407,7 +407,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -437,7 +437,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -462,7 +462,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -503,7 +503,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -533,7 +533,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, diff --git a/supervisor_pool.go b/supervisor_pool.go index 43c36ae4..b354b493 100755 --- a/supervisor_pool.go +++ b/supervisor_pool.go @@ -92,7 +92,7 @@ func (sp *supervisedPool) AddListener(listener util.EventListener) { sp.pool.AddListener(listener) } -func (sp *supervisedPool) GetConfig() Config { +func (sp *supervisedPool) GetConfig() PoolConfig { return sp.pool.GetConfig() } diff --git a/supervisor_test.go b/supervisor_test.go index 34172d7d..08ea356d 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" ) -var cfgSupervised = Config{ +var cfgSupervised = PoolConfig{ NumWorkers: int64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -69,7 +69,7 @@ func TestSupervisedPool_Exec(t *testing.T) { } func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = PoolConfig{ NumWorkers: int64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -109,7 +109,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { } func TestSupervisedPool_ExecTTL_OK(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = PoolConfig{ NumWorkers: int64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, |