summaryrefslogtreecommitdiff
path: root/plugins/http/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-17 16:25:35 +0300
committerValery Piashchynski <[email protected]>2020-11-17 16:25:35 +0300
commit3cbdd3d3e44b3b4e72565d666391e3b732950774 (patch)
tree7c60fafe1c33076631e39fe26be187c9ca359a3e /plugins/http/plugin.go
parenta57d064407e2ed7f35dd591101b5d421c64605e1 (diff)
Get http working with new container
Diffstat (limited to 'plugins/http/plugin.go')
-rw-r--r--plugins/http/plugin.go252
1 files changed, 153 insertions, 99 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 24eaa68c..94b6c74b 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -12,11 +12,13 @@ import (
"strings"
"sync"
+ "github.com/hashicorp/go-multierror"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
- factory "github.com/spiral/roadrunner/v2/interfaces/app"
"github.com/spiral/roadrunner/v2/interfaces/log"
+ factory "github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/util"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sys/cpu"
@@ -79,7 +81,7 @@ func (s *Plugin) AddListener(l func(event int, ctx interface{})) {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerFactory) error {
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.WorkerFactory) error {
const op = errors.Op("http Init")
err := cfg.UnmarshalKey(ServiceName, &s.cfg)
if err != nil {
@@ -88,14 +90,18 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF
s.log = log
- p, err := app.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
- Debug: s.cfg.Workers.PoolCfg.Debug,
- NumWorkers: s.cfg.Workers.PoolCfg.NumWorkers,
- MaxJobs: s.cfg.Workers.PoolCfg.MaxJobs,
- AllocateTimeout: s.cfg.Workers.PoolCfg.AllocateTimeout,
- DestroyTimeout: s.cfg.Workers.PoolCfg.DestroyTimeout,
+ // Set needed env vars
+ env := make(map[string]string)
+ env["RR_HTTP"] = "true"
+
+ p, err := server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ Debug: s.cfg.Pool.Debug,
+ NumWorkers: s.cfg.Pool.NumWorkers,
+ MaxJobs: s.cfg.Pool.MaxJobs,
+ AllocateTimeout: s.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: s.cfg.Pool.DestroyTimeout,
Supervisor: nil,
- }, nil)
+ }, env)
if err != nil {
return errors.E(op, err)
@@ -117,27 +123,29 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF
}
// Serve serves the svc.
-func (s *Plugin) Serve() error {
+func (s *Plugin) Serve() chan error {
s.Lock()
+ const op = errors.Op("serve http")
+ errCh := make(chan error, 2)
- if s.env != nil {
- if err := s.env.Copy(s.cfg.Workers); err != nil {
- return nil
- }
- }
-
- s.cfg.Workers.CommandProducer = s.cprod
- s.cfg.Workers.SetEnv("RR_HTTP", "true")
-
- s.rr = roadrunner.NewServer(s.cfg.Workers)
- s.rr.Listen(s.throw)
-
- if s.controller != nil {
- s.rr.Attach(s.controller)
- }
+ //if s.env != nil {
+ // if err := s.env.Copy(s.cfg.Workers); err != nil {
+ // return nil
+ // }
+ //}
+ //
+ //s.cfg.Workers.CommandProducer = s.cprod
+ //s.cfg.Workers.SetEnv("RR_HTTP", "true")
+ //
+ //s.rr = roadrunner.NewServer(s.cfg.Workers)
+ //s.rr.Listen(s.throw)
+ //
+ //if s.controller != nil {
+ // s.rr.Attach(s.controller)
+ //}
s.handler = &Handler{cfg: s.cfg, rr: s.rr}
- s.handler.Listen(s.throw)
+ //s.handler.Listen(s.throw)
if s.cfg.EnableHTTP() {
if s.cfg.EnableH2C() {
@@ -152,13 +160,15 @@ func (s *Plugin) Serve() error {
if s.cfg.SSL.RootCA != "" {
err := s.appendRootCa()
if err != nil {
- return err
+ errCh <- errors.E(op, err)
+ return errCh
}
}
if s.cfg.EnableHTTP2() {
if err := s.initHTTP2(); err != nil {
- return err
+ errCh <- errors.E(op, err)
+ return errCh
}
}
}
@@ -169,21 +179,19 @@ func (s *Plugin) Serve() error {
s.Unlock()
- if err := s.rr.Start(); err != nil {
- return err
- }
- defer s.rr.Stop()
-
- err := make(chan error, 3)
+ //if err := s.rr.Start(); err != nil {
+ // return err
+ //}
+ //defer s.rr.Stop()
if s.http != nil {
go func() {
httpErr := s.http.ListenAndServe()
if httpErr != nil && httpErr != http.ErrServerClosed {
- err <- httpErr
- } else {
- err <- nil
+ errCh <- errors.E(op, httpErr)
+ return
}
+ return
}()
}
@@ -195,10 +203,10 @@ func (s *Plugin) Serve() error {
)
if httpErr != nil && httpErr != http.ErrServerClosed {
- err <- httpErr
+ errCh <- errors.E(op, httpErr)
return
}
- err <- nil
+ return
}()
}
@@ -206,72 +214,54 @@ func (s *Plugin) Serve() error {
go func() {
httpErr := s.serveFCGI()
if httpErr != nil && httpErr != http.ErrServerClosed {
- err <- httpErr
+ errCh <- errors.E(op, httpErr)
return
}
- err <- nil
+ return
}()
}
- return <-err
+ return errCh
}
// Stop stops the http.
-func (s *Plugin) Stop() {
+func (s *Plugin) Stop() error {
s.Lock()
defer s.Unlock()
+ var err error
if s.fcgi != nil {
- s.Add(1)
- go func() {
- defer s.Done()
- err := s.fcgi.Shutdown(context.Background())
- if err != nil && err != http.ErrServerClosed {
- // Stop() error
- // push error from goroutines to the channel and block unil error or success shutdown or timeout
- s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err))
- return
- }
- }()
+ err = s.fcgi.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error("error shutting down the fcgi server", "error", err)
+ // write error and try to stop other transport
+ err = multierror.Append(err)
+ }
}
if s.https != nil {
- s.Add(1)
- go func() {
- defer s.Done()
- err := s.https.Shutdown(context.Background())
- if err != nil && err != http.ErrServerClosed {
- s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err))
- return
- }
- }()
+ err = s.https.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error("error shutting down the https server", "error", err)
+ // write error and try to stop other transport
+ err = multierror.Append(err)
+ }
}
if s.http != nil {
- s.Add(1)
- go func() {
- defer s.Done()
- err := s.http.Shutdown(context.Background())
- if err != nil && err != http.ErrServerClosed {
- s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err))
- return
- }
- }()
+ err = s.http.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error("error shutting down the http server", "error", err)
+ // write error and try to stop other transport
+ err = multierror.Append(err)
+ }
}
- s.Wait()
-}
-
-// Server returns associated rr server (if any).
-func (s *Plugin) Server() *roadrunner.Server {
- s.Lock()
- defer s.Unlock()
-
- return s.rr
+ return err
}
// ServeHTTP handles connection using set of middleware and rr PSR-7 server.
-func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect {
target := &url.URL{
Scheme: "https",
@@ -288,7 +278,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
}
- r = attributes.Init(r)
+ //r = attributes.Init(r)
// chaining middleware
f := s.handler.ServeHTTP
@@ -300,9 +290,10 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// append RootCA to the https server TLS config
func (s *Plugin) appendRootCa() error {
+ const op = errors.Op("append root CA")
rootCAs, err := x509.SystemCertPool()
if err != nil {
- s.throw(EventInitSSL, nil)
+ //s.throw(EventInitSSL, nil)
return nil
}
if rootCAs == nil {
@@ -311,20 +302,20 @@ func (s *Plugin) appendRootCa() error {
CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA)
if err != nil {
- s.throw(EventInitSSL, nil)
+ //s.throw(EventInitSSL, nil)
return err
}
// should append our CA cert
ok := rootCAs.AppendCertsFromPEM(CA)
if !ok {
- return couldNotAppendPemError
+ return errors.E(op, errors.Str("could not append Certs from PEM"))
}
- config := &tls.Config{
+ cfg := &tls.Config{
InsecureSkipVerify: false,
RootCAs: rootCAs,
}
- s.http.TLSConfig = config
+ s.http.TLSConfig = cfg
return nil
}
@@ -394,7 +385,7 @@ func (s *Plugin) initSSL() *http.Server {
PreferServerCipherSuites: true,
},
}
- s.throw(EventInitSSL, server)
+ //s.throw(EventInitSSL, server)
return server
}
@@ -422,16 +413,16 @@ func (s *Plugin) serveFCGI() error {
}
// throw handles service, server and pool events.
-func (s *Plugin) throw(event int, ctx interface{}) {
- for _, l := range s.lsns {
- l(event, ctx)
- }
-
- if event == roadrunner.EventServerFailure {
- // underlying rr server is dead
- s.Stop()
- }
-}
+//func (s *Plugin) throw(event int, ctx interface{}) {
+// for _, l := range s.lsns {
+// l(event, ctx)
+// }
+//
+// if event == roadrunner.EventServerFailure {
+// // underlying rr server is dead
+// s.Stop()
+// }
+//}
// tlsAddr replaces listen or host port with port configured by SSL config.
func (s *Plugin) tlsAddr(host string, forcePort bool) string {
@@ -444,3 +435,66 @@ func (s *Plugin) tlsAddr(host string, forcePort bool) string {
return host
}
+
+// Server returns associated rr workers
+func (s *Plugin) Workers() []roadrunner.WorkerBase {
+ return s.rr.Workers()
+}
+
+func (s *Plugin) Reset() error {
+ // re-read the config
+ // destroy the pool
+ // attach new one
+
+ //s.mup.Lock()
+ //defer s.mup.Unlock()
+ //
+ //s.mu.Lock()
+ //if !s.started {
+ // s.cfg = cfg
+ // s.mu.Unlock()
+ // return nil
+ //}
+ //s.mu.Unlock()
+ //
+ //if s.cfg.Differs(cfg) {
+ // return errors.New("unable to reconfigure server (cmd and pool changes are allowed)")
+ //}
+ //
+ //s.mu.Lock()
+ //previous := s.pool
+ //pWatcher := s.pController
+ //s.mu.Unlock()
+ //
+ //pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool)
+ //if err != nil {
+ // return err
+ //}
+ //
+ //pool.Listen(s.poolListener)
+ //
+ //s.mu.Lock()
+ //s.cfg.Pool, s.pool = cfg.Pool, pool
+ //
+ //if s.controller != nil {
+ // s.pController = s.controller.Attach(pool)
+ //}
+ //
+ //s.mu.Unlock()
+ //
+ //s.throw(EventPoolConstruct, pool)
+ //
+ //if previous != nil {
+ // go func(previous Pool, pWatcher Controller) {
+ // s.throw(EventPoolDestruct, previous)
+ // if pWatcher != nil {
+ // pWatcher.Detach()
+ // }
+ //
+ // previous.Destroy()
+ // }(previous, pWatcher)
+ //}
+ //
+ //return nil
+ return nil
+}