From 57ad958acab2d108be0a35547faf6e7a791cf069 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 16 Nov 2020 14:21:35 +0300 Subject: Update Pool --- plugins/http/attributes/attributes_test.go | 3 +- plugins/http/config.go | 31 +- plugins/http/config_test.go | 656 ++++++++++++++--------------- plugins/http/handler.go | 6 +- plugins/http/plugin.go | 97 +++-- plugins/http/request.go | 28 +- plugins/http/response.go | 2 - plugins/http/test/http_test.go | 23 + plugins/http/test/rr-http.yaml | 67 +++ plugins/http/uploads.go | 8 +- 10 files changed, 532 insertions(+), 389 deletions(-) create mode 100644 plugins/http/test/http_test.go create mode 100644 plugins/http/test/rr-http.yaml (limited to 'plugins/http') diff --git a/plugins/http/attributes/attributes_test.go b/plugins/http/attributes/attributes_test.go index 2360fd12..d914f6fa 100644 --- a/plugins/http/attributes/attributes_test.go +++ b/plugins/http/attributes/attributes_test.go @@ -1,9 +1,10 @@ package attributes import ( - "github.com/stretchr/testify/assert" "net/http" "testing" + + "github.com/stretchr/testify/assert" ) func TestAllAttributes(t *testing.T) { diff --git a/plugins/http/config.go b/plugins/http/config.go index e91133ef..b3d16b62 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -6,8 +6,37 @@ import ( "net" "os" "strings" + "time" + + "github.com/spiral/roadrunner/v2" ) +type PoolConfig struct { +} + +type ServerConfig struct { + // Command includes command strings with all the parameters, example: "php worker.php pipes". + Command string + + // User under which process will be started + User string + + // Relay defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. + RelayTimeout time.Duration + + // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change + // while server is running. + PoolCfg *roadrunner.PoolConfig + + env map[string]string +} + // Config configures RoadRunner HTTP server. type Config struct { // Port and port to handle as http server. @@ -33,7 +62,7 @@ type Config struct { Uploads *UploadsConfig // Workers configures rr server and worker pool. - Workers *roadrunner.ServerConfig + Workers *ServerConfig } // FCGIConfig for FastCGI server. diff --git a/plugins/http/config_test.go b/plugins/http/config_test.go index 96fbb954..6d23b6ca 100644 --- a/plugins/http/config_test.go +++ b/plugins/http/config_test.go @@ -1,330 +1,330 @@ package http -import ( - "os" - "testing" - "time" - - json "github.com/json-iterator/go" - "github.com/stretchr/testify/assert" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { - j := json.ConfigCompatibleWithStandardLibrary - return j.Unmarshal([]byte(cfg.cfg), out) -} - -func Test_Config_Hydrate_Error1(t *testing.T) { - cfg := &mockCfg{`{"address": "localhost:8080"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{"dir": "/dir/"`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Valid(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.NoError(t, cfg.Valid()) -} - -func Test_Trusted_Subnets(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - TrustedSubnets: []string{"200.1.0.0/16"}, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.NoError(t, cfg.parseCIDRs()) - - assert.True(t, cfg.IsTrusted("200.1.0.10")) - assert.False(t, cfg.IsTrusted("127.0.0.0.1")) -} - -func Test_Trusted_Subnets_Err(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - TrustedSubnets: []string{"200.1.0.0"}, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.parseCIDRs()) -} - -func Test_Config_Valid_SSL(t *testing.T) { - cfg := &Config{ - Address: ":8080", - SSL: SSLConfig{ - Cert: "fixtures/server.crt", - Key: "fixtures/server.key", - }, - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Hydrate(&testCfg{httpCfg: "{}"})) - - assert.NoError(t, cfg.Valid()) - assert.True(t, cfg.EnableTLS()) - assert.Equal(t, 443, cfg.SSL.Port) -} - -func Test_Config_SSL_No_key(t *testing.T) { - cfg := &Config{ - Address: ":8080", - SSL: SSLConfig{ - Cert: "fixtures/server.crt", - }, - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_SSL_No_Cert(t *testing.T) { - cfg := &Config{ - Address: ":8080", - SSL: SSLConfig{ - Key: "fixtures/server.key", - }, - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_NoUploads(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_NoHTTP2(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 0, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_NoWorkers(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_NoPool(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 0, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_DeadPool(t *testing.T) { - cfg := &Config{ - Address: ":8080", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - }, - } - - assert.Error(t, cfg.Valid()) -} - -func Test_Config_InvalidAddress(t *testing.T) { - cfg := &Config{ - Address: "unexpected_address", - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - HTTP2: &HTTP2Config{ - Enabled: true, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } - - assert.Error(t, cfg.Valid()) -} +//import ( +// "os" +// "testing" +// "time" +// +// json "github.com/json-iterator/go" +// "github.com/stretchr/testify/assert" +//) +// +//type mockCfg struct{ cfg string } +// +//func (cfg *mockCfg) Get(name string) service.Config { return nil } +//func (cfg *mockCfg) Unmarshal(out interface{}) error { +// j := json.ConfigCompatibleWithStandardLibrary +// return j.Unmarshal([]byte(cfg.cfg), out) +//} +// +//func Test_Config_Hydrate_Error1(t *testing.T) { +// cfg := &mockCfg{`{"address": "localhost:8080"}`} +// c := &Config{} +// +// assert.NoError(t, c.Hydrate(cfg)) +//} +// +//func Test_Config_Hydrate_Error2(t *testing.T) { +// cfg := &mockCfg{`{"dir": "/dir/"`} +// c := &Config{} +// +// assert.Error(t, c.Hydrate(cfg)) +//} +// +//func Test_Config_Valid(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.NoError(t, cfg.Valid()) +//} +// +//func Test_Trusted_Subnets(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// TrustedSubnets: []string{"200.1.0.0/16"}, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.NoError(t, cfg.parseCIDRs()) +// +// assert.True(t, cfg.IsTrusted("200.1.0.10")) +// assert.False(t, cfg.IsTrusted("127.0.0.0.1")) +//} +// +//func Test_Trusted_Subnets_Err(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// TrustedSubnets: []string{"200.1.0.0"}, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.parseCIDRs()) +//} +// +//func Test_Config_Valid_SSL(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// SSL: SSLConfig{ +// Cert: "fixtures/server.crt", +// Key: "fixtures/server.key", +// }, +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Hydrate(&testCfg{httpCfg: "{}"})) +// +// assert.NoError(t, cfg.Valid()) +// assert.True(t, cfg.EnableTLS()) +// assert.Equal(t, 443, cfg.SSL.Port) +//} +// +//func Test_Config_SSL_No_key(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// SSL: SSLConfig{ +// Cert: "fixtures/server.crt", +// }, +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_SSL_No_Cert(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// SSL: SSLConfig{ +// Key: "fixtures/server.key", +// }, +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoUploads(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoHTTP2(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 0, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoWorkers(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoPool(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 0, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_DeadPool(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_InvalidAddress(t *testing.T) { +// cfg := &Config{ +// Address: "unexpected_address", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} diff --git a/plugins/http/handler.go b/plugins/http/handler.go index 2bda4f1d..5b612d7e 100644 --- a/plugins/http/handler.go +++ b/plugins/http/handler.go @@ -10,6 +10,8 @@ import ( "time" "github.com/pkg/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/log" ) const ( @@ -60,8 +62,8 @@ func (e *ResponseEvent) Elapsed() time.Duration { // parsed files and query, payload will include parsed form dataTree (if any). type Handler struct { cfg *Config - log *logrus.Logger - rr *roadrunner.Server + log log.Logger + rr roadrunner.Pool mul sync.Mutex lsn func(event int, ctx interface{}) } diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 7c46c814..24eaa68c 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "crypto/x509" - "errors" "fmt" "io/ioutil" "net/http" @@ -13,9 +12,11 @@ import ( "strings" "sync" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + factory "github.com/spiral/roadrunner/v2/interfaces/app" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/rpc" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "golang.org/x/sys/cpu" @@ -23,64 +24,85 @@ import ( const ( // ID contains default service name. - ID = "http" + ServiceName = "http" // EventInitSSL thrown at moment of https initialization. SSL server passed as context. EventInitSSL = 750 ) -var couldNotAppendPemError = errors.New("could not append Certs from PEM") +//var couldNotAppendPemError = errors.New("could not append Certs from PEM") // http middleware type. type middleware func(f http.HandlerFunc) http.HandlerFunc // Service manages rr, http servers. -type Service struct { +type Plugin struct { sync.Mutex sync.WaitGroup - cfg *Config - log *logrus.Logger - cprod roadrunner.CommandProducer - env env.Environment - lsns []func(event int, ctx interface{}) - mdwr []middleware + cfg *Config + log log.Logger - rr *roadrunner.Server - controller roadrunner.Controller - handler *Handler + //cprod roadrunner.CommandProducer + env map[string]string + lsns []func(event int, ctx interface{}) + mdwr []middleware + + rr roadrunner.Pool + //controller roadrunner.Controller + handler *Handler http *http.Server https *http.Server fcgi *http.Server } -// Attach attaches controller. Currently only one controller is supported. -func (s *Service) Attach(w roadrunner.Controller) { - s.controller = w -} - -// ProduceCommands changes the default command generator method -func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) { - s.cprod = producer -} +//// Attach attaches controller. Currently only one controller is supported. +//func (s *Service) Attach(w roadrunner.Controller) { +// s.controller = w +//} +// +//// ProduceCommands changes the default command generator method +//func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) { +// s.cprod = producer +//} // AddMiddleware adds new net/http mdwr. -func (s *Service) AddMiddleware(m middleware) { +func (s *Plugin) AddMiddleware(m middleware) { s.mdwr = append(s.mdwr, m) } // AddListener attaches server event controller. -func (s *Service) AddListener(l func(event int, ctx interface{})) { +func (s *Plugin) AddListener(l func(event int, ctx interface{})) { s.lsns = append(s.lsns, l) } // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg config.Configurer, r rpc.Plugin, log log.Logger) (bool, error) { - s.cfg = cfg +func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerFactory) error { + const op = errors.Op("http Init") + err := cfg.UnmarshalKey(ServiceName, &s.cfg) + if err != nil { + return errors.E(op, err) + } + s.log = log + p, err := app.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + Debug: s.cfg.Workers.PoolCfg.Debug, + NumWorkers: s.cfg.Workers.PoolCfg.NumWorkers, + MaxJobs: s.cfg.Workers.PoolCfg.MaxJobs, + AllocateTimeout: s.cfg.Workers.PoolCfg.AllocateTimeout, + DestroyTimeout: s.cfg.Workers.PoolCfg.DestroyTimeout, + Supervisor: nil, + }, nil) + + if err != nil { + return errors.E(op, err) + } + + s.rr = p + //if r != nil { // if err := r.Register(ID, &rpcServer{s}); err != nil { // return false, err @@ -91,11 +113,11 @@ func (s *Service) Init(cfg config.Configurer, r rpc.Plugin, log log.Logger) (boo // return false, nil //} - return true, nil + return nil } // Serve serves the svc. -func (s *Service) Serve() error { +func (s *Plugin) Serve() error { s.Lock() if s.env != nil { @@ -190,11 +212,12 @@ func (s *Service) Serve() error { err <- nil }() } + return <-err } // Stop stops the http. -func (s *Service) Stop() { +func (s *Plugin) Stop() { s.Lock() defer s.Unlock() @@ -240,7 +263,7 @@ func (s *Service) Stop() { } // Server returns associated rr server (if any). -func (s *Service) Server() *roadrunner.Server { +func (s *Plugin) Server() *roadrunner.Server { s.Lock() defer s.Unlock() @@ -276,7 +299,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // append RootCA to the https server TLS config -func (s *Service) appendRootCa() error { +func (s *Plugin) appendRootCa() error { rootCAs, err := x509.SystemCertPool() if err != nil { s.throw(EventInitSSL, nil) @@ -307,7 +330,7 @@ func (s *Service) appendRootCa() error { } // Init https server -func (s *Service) initSSL() *http.Server { +func (s *Plugin) initSSL() *http.Server { var topCipherSuites []uint16 var defaultCipherSuitesTLS13 []uint16 @@ -377,14 +400,14 @@ func (s *Service) initSSL() *http.Server { } // init http/2 server -func (s *Service) initHTTP2() error { +func (s *Plugin) initHTTP2() error { return http2.ConfigureServer(s.https, &http2.Server{ MaxConcurrentStreams: s.cfg.HTTP2.MaxConcurrentStreams, }) } // serveFCGI starts FastCGI server. -func (s *Service) serveFCGI() error { +func (s *Plugin) serveFCGI() error { l, err := util.CreateListener(s.cfg.FCGI.Address) if err != nil { return err @@ -399,7 +422,7 @@ func (s *Service) serveFCGI() error { } // throw handles service, server and pool events. -func (s *Service) throw(event int, ctx interface{}) { +func (s *Plugin) throw(event int, ctx interface{}) { for _, l := range s.lsns { l(event, ctx) } @@ -411,7 +434,7 @@ func (s *Service) throw(event int, ctx interface{}) { } // tlsAddr replaces listen or host port with port configured by SSL config. -func (s *Service) tlsAddr(host string, forcePort bool) string { +func (s *Plugin) tlsAddr(host string, forcePort bool) string { // remove current forcePort first host = strings.Split(host, ":")[0] diff --git a/plugins/http/request.go b/plugins/http/request.go index b3123eb2..7e9839b2 100644 --- a/plugins/http/request.go +++ b/plugins/http/request.go @@ -9,6 +9,7 @@ import ( "strings" json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/log" ) @@ -66,8 +67,8 @@ func fetchIP(pair string) string { } // NewRequest creates new PSR7 compatible request using net/http request. -func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { - req = &Request{ +func NewRequest(r *http.Request, cfg *UploadsConfig) (*Request, error) { + req := &Request{ RemoteAddr: fetchIP(r.RemoteAddr), Protocol: r.Proto, Method: r.Method, @@ -75,7 +76,7 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { Header: r.Header, Cookies: make(map[string]string), RawQuery: r.URL.RawQuery, - Attributes: attributes.All(r), + //Attributes: attributes.All(r), } for _, c := range r.Cookies() { @@ -89,18 +90,19 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { return req, nil case contentStream: + var err error req.body, err = ioutil.ReadAll(r.Body) return req, err case contentMultipart: - if err = r.ParseMultipartForm(defaultMaxMemory); err != nil { + if err := r.ParseMultipartForm(defaultMaxMemory); err != nil { return nil, err } req.Uploads = parseUploads(r, cfg) fallthrough case contentFormData: - if err = r.ParseForm(); err != nil { + if err := r.ParseForm(); err != nil { return nil, err } @@ -121,7 +123,7 @@ func (r *Request) Open(log log.Logger) { } // Close clears all temp file uploads -func (r *Request) Close(log *logrus.Logger) { +func (r *Request) Close(log log.Logger) { if r.Uploads == nil { return } @@ -131,17 +133,17 @@ func (r *Request) Close(log *logrus.Logger) { // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open // files prior to calling this method. -func (r *Request) Payload() (p *roadrunner.Payload, err error) { - p = &roadrunner.Payload{} +func (r *Request) Payload() (roadrunner.Payload, error) { + p := roadrunner.Payload{} - j := json.ConfigCompatibleWithStandardLibrary - if p.Context, err = j.Marshal(r); err != nil { - return nil, err + var err error + if p.Context, err = json.Marshal(r); err != nil { + return roadrunner.EmptyPayload, err } if r.Parsed { - if p.Body, err = j.Marshal(r.body); err != nil { - return nil, err + if p.Body, err = json.Marshal(r.body); err != nil { + return roadrunner.EmptyPayload, err } } else if r.body != nil { p.Body = r.body.([]byte) diff --git a/plugins/http/response.go b/plugins/http/response.go index f34754be..88848b9d 100644 --- a/plugins/http/response.go +++ b/plugins/http/response.go @@ -6,8 +6,6 @@ import ( "strings" json "github.com/json-iterator/go" - - "github.com/spiral/roadrunner" ) diff --git a/plugins/http/test/http_test.go b/plugins/http/test/http_test.go new file mode 100644 index 00000000..c109d930 --- /dev/null +++ b/plugins/http/test/http_test.go @@ -0,0 +1,23 @@ +package test + +import ( + "testing" + + "github.com/spiral/endure" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/stretchr/testify/assert" +) + +func TestHTTPInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, "")) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: ".rr-http.yaml", + Prefix: "", + } + err = cont.RegisterAll( + + ) + assert.NoError(t, err) +} diff --git a/plugins/http/test/rr-http.yaml b/plugins/http/test/rr-http.yaml new file mode 100644 index 00000000..8a70ce1c --- /dev/null +++ b/plugins/http/test/rr-http.yaml @@ -0,0 +1,67 @@ +http: + address: 0.0.0.0:8080 + maxRequestSize: 200 + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + + ssl: + port: 443 + redirect: true + cert: server.crt + key: server.key + rootCa: root.crt + fcgi: + address: tcp://0.0.0.0:6920 + http2: + enabled: true + h2c: true + maxConcurrentStreams: 128 + + workers: + command: "php psr-worker.php pipes" + user: "" + + # connection method (pipes, tcp://:9000, unix://socket.unix). default "pipes" + relay: "pipes" + + pool: + numWorkers: 4 + maxJobs: 0 + allocateTimeout: 60 + destroyTimeout: 60 + + +transport: + http: + address: 0.0.0.0:8080 + maxRequestSize: 200 + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + + ssl: + port: 443 + redirect: true + cert: server.crt + key: server.key + rootCa: root.crt + fcgi: + address: tcp://0.0.0.0:6920 + http2: + enabled: true + h2c: true + maxConcurrentStreams: 128 + + workers: + command: "php psr-worker.php pipes" + user: "" + + # connection method (pipes, tcp://:9000, unix://socket.unix). default "pipes" + relay: "pipes" + + pool: + numWorkers: 4 + maxJobs: 0 + allocateTimeout: 60 + destroyTimeout: 60 \ No newline at end of file diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go index 99427b69..2a1524ef 100644 --- a/plugins/http/uploads.go +++ b/plugins/http/uploads.go @@ -1,8 +1,6 @@ package http import ( - "fmt" - json "github.com/json-iterator/go" "github.com/spiral/roadrunner/v2/interfaces/log" @@ -58,7 +56,7 @@ func (u *Uploads) Open(log log.Logger) { defer wg.Done() err := f.Open(u.cfg) if err != nil && log != nil { - log.Error(fmt.Errorf("error opening the file: error %v", err)) + log.Error("error opening the file", "err", err) } }(f) } @@ -67,12 +65,12 @@ func (u *Uploads) Open(log log.Logger) { } // Clear deletes all temporary files. -func (u *Uploads) Clear(log *logrus.Logger) { +func (u *Uploads) Clear(log log.Logger) { for _, f := range u.list { if f.TempFilename != "" && exists(f.TempFilename) { err := os.Remove(f.TempFilename) if err != nil && log != nil { - log.Error(fmt.Errorf("error removing the file: error %v", err)) + log.Error("error removing the file", "err", err) } } } -- cgit v1.2.3