diff options
author | Valery Piashchynski <[email protected]> | 2020-11-17 16:25:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-11-17 16:25:35 +0300 |
commit | 3cbdd3d3e44b3b4e72565d666391e3b732950774 (patch) | |
tree | 7c60fafe1c33076631e39fe26be187c9ca359a3e /plugins/http/plugin.go | |
parent | a57d064407e2ed7f35dd591101b5d421c64605e1 (diff) |
Get http working with new container
Diffstat (limited to 'plugins/http/plugin.go')
-rw-r--r-- | plugins/http/plugin.go | 252 |
1 files changed, 153 insertions, 99 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 24eaa68c..94b6c74b 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -12,11 +12,13 @@ import ( "strings" "sync" + "github.com/hashicorp/go-multierror" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" - factory "github.com/spiral/roadrunner/v2/interfaces/app" "github.com/spiral/roadrunner/v2/interfaces/log" + factory "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/util" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "golang.org/x/sys/cpu" @@ -79,7 +81,7 @@ func (s *Plugin) AddListener(l func(event int, ctx interface{})) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerFactory) error { +func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.WorkerFactory) error { const op = errors.Op("http Init") err := cfg.UnmarshalKey(ServiceName, &s.cfg) if err != nil { @@ -88,14 +90,18 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF s.log = log - p, err := app.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ - Debug: s.cfg.Workers.PoolCfg.Debug, - NumWorkers: s.cfg.Workers.PoolCfg.NumWorkers, - MaxJobs: s.cfg.Workers.PoolCfg.MaxJobs, - AllocateTimeout: s.cfg.Workers.PoolCfg.AllocateTimeout, - DestroyTimeout: s.cfg.Workers.PoolCfg.DestroyTimeout, + // Set needed env vars + env := make(map[string]string) + env["RR_HTTP"] = "true" + + p, err := server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + Debug: s.cfg.Pool.Debug, + NumWorkers: s.cfg.Pool.NumWorkers, + MaxJobs: s.cfg.Pool.MaxJobs, + AllocateTimeout: s.cfg.Pool.AllocateTimeout, + DestroyTimeout: s.cfg.Pool.DestroyTimeout, Supervisor: nil, - }, nil) + }, env) if err != nil { return errors.E(op, err) @@ -117,27 +123,29 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF } // Serve serves the svc. -func (s *Plugin) Serve() error { +func (s *Plugin) Serve() chan error { s.Lock() + const op = errors.Op("serve http") + errCh := make(chan error, 2) - if s.env != nil { - if err := s.env.Copy(s.cfg.Workers); err != nil { - return nil - } - } - - s.cfg.Workers.CommandProducer = s.cprod - s.cfg.Workers.SetEnv("RR_HTTP", "true") - - s.rr = roadrunner.NewServer(s.cfg.Workers) - s.rr.Listen(s.throw) - - if s.controller != nil { - s.rr.Attach(s.controller) - } + //if s.env != nil { + // if err := s.env.Copy(s.cfg.Workers); err != nil { + // return nil + // } + //} + // + //s.cfg.Workers.CommandProducer = s.cprod + //s.cfg.Workers.SetEnv("RR_HTTP", "true") + // + //s.rr = roadrunner.NewServer(s.cfg.Workers) + //s.rr.Listen(s.throw) + // + //if s.controller != nil { + // s.rr.Attach(s.controller) + //} s.handler = &Handler{cfg: s.cfg, rr: s.rr} - s.handler.Listen(s.throw) + //s.handler.Listen(s.throw) if s.cfg.EnableHTTP() { if s.cfg.EnableH2C() { @@ -152,13 +160,15 @@ func (s *Plugin) Serve() error { if s.cfg.SSL.RootCA != "" { err := s.appendRootCa() if err != nil { - return err + errCh <- errors.E(op, err) + return errCh } } if s.cfg.EnableHTTP2() { if err := s.initHTTP2(); err != nil { - return err + errCh <- errors.E(op, err) + return errCh } } } @@ -169,21 +179,19 @@ func (s *Plugin) Serve() error { s.Unlock() - if err := s.rr.Start(); err != nil { - return err - } - defer s.rr.Stop() - - err := make(chan error, 3) + //if err := s.rr.Start(); err != nil { + // return err + //} + //defer s.rr.Stop() if s.http != nil { go func() { httpErr := s.http.ListenAndServe() if httpErr != nil && httpErr != http.ErrServerClosed { - err <- httpErr - } else { - err <- nil + errCh <- errors.E(op, httpErr) + return } + return }() } @@ -195,10 +203,10 @@ func (s *Plugin) Serve() error { ) if httpErr != nil && httpErr != http.ErrServerClosed { - err <- httpErr + errCh <- errors.E(op, httpErr) return } - err <- nil + return }() } @@ -206,72 +214,54 @@ func (s *Plugin) Serve() error { go func() { httpErr := s.serveFCGI() if httpErr != nil && httpErr != http.ErrServerClosed { - err <- httpErr + errCh <- errors.E(op, httpErr) return } - err <- nil + return }() } - return <-err + return errCh } // Stop stops the http. -func (s *Plugin) Stop() { +func (s *Plugin) Stop() error { s.Lock() defer s.Unlock() + var err error if s.fcgi != nil { - s.Add(1) - go func() { - defer s.Done() - err := s.fcgi.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - // Stop() error - // push error from goroutines to the channel and block unil error or success shutdown or timeout - s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err)) - return - } - }() + err = s.fcgi.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the fcgi server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } } if s.https != nil { - s.Add(1) - go func() { - defer s.Done() - err := s.https.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err)) - return - } - }() + err = s.https.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the https server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } } if s.http != nil { - s.Add(1) - go func() { - defer s.Done() - err := s.http.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err)) - return - } - }() + err = s.http.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the http server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } } - s.Wait() -} - -// Server returns associated rr server (if any). -func (s *Plugin) Server() *roadrunner.Server { - s.Lock() - defer s.Unlock() - - return s.rr + return err } // ServeHTTP handles connection using set of middleware and rr PSR-7 server. -func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect { target := &url.URL{ Scheme: "https", @@ -288,7 +278,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") } - r = attributes.Init(r) + //r = attributes.Init(r) // chaining middleware f := s.handler.ServeHTTP @@ -300,9 +290,10 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { // append RootCA to the https server TLS config func (s *Plugin) appendRootCa() error { + const op = errors.Op("append root CA") rootCAs, err := x509.SystemCertPool() if err != nil { - s.throw(EventInitSSL, nil) + //s.throw(EventInitSSL, nil) return nil } if rootCAs == nil { @@ -311,20 +302,20 @@ func (s *Plugin) appendRootCa() error { CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA) if err != nil { - s.throw(EventInitSSL, nil) + //s.throw(EventInitSSL, nil) return err } // should append our CA cert ok := rootCAs.AppendCertsFromPEM(CA) if !ok { - return couldNotAppendPemError + return errors.E(op, errors.Str("could not append Certs from PEM")) } - config := &tls.Config{ + cfg := &tls.Config{ InsecureSkipVerify: false, RootCAs: rootCAs, } - s.http.TLSConfig = config + s.http.TLSConfig = cfg return nil } @@ -394,7 +385,7 @@ func (s *Plugin) initSSL() *http.Server { PreferServerCipherSuites: true, }, } - s.throw(EventInitSSL, server) + //s.throw(EventInitSSL, server) return server } @@ -422,16 +413,16 @@ func (s *Plugin) serveFCGI() error { } // throw handles service, server and pool events. -func (s *Plugin) throw(event int, ctx interface{}) { - for _, l := range s.lsns { - l(event, ctx) - } - - if event == roadrunner.EventServerFailure { - // underlying rr server is dead - s.Stop() - } -} +//func (s *Plugin) throw(event int, ctx interface{}) { +// for _, l := range s.lsns { +// l(event, ctx) +// } +// +// if event == roadrunner.EventServerFailure { +// // underlying rr server is dead +// s.Stop() +// } +//} // tlsAddr replaces listen or host port with port configured by SSL config. func (s *Plugin) tlsAddr(host string, forcePort bool) string { @@ -444,3 +435,66 @@ func (s *Plugin) tlsAddr(host string, forcePort bool) string { return host } + +// Server returns associated rr workers +func (s *Plugin) Workers() []roadrunner.WorkerBase { + return s.rr.Workers() +} + +func (s *Plugin) Reset() error { + // re-read the config + // destroy the pool + // attach new one + + //s.mup.Lock() + //defer s.mup.Unlock() + // + //s.mu.Lock() + //if !s.started { + // s.cfg = cfg + // s.mu.Unlock() + // return nil + //} + //s.mu.Unlock() + // + //if s.cfg.Differs(cfg) { + // return errors.New("unable to reconfigure server (cmd and pool changes are allowed)") + //} + // + //s.mu.Lock() + //previous := s.pool + //pWatcher := s.pController + //s.mu.Unlock() + // + //pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) + //if err != nil { + // return err + //} + // + //pool.Listen(s.poolListener) + // + //s.mu.Lock() + //s.cfg.Pool, s.pool = cfg.Pool, pool + // + //if s.controller != nil { + // s.pController = s.controller.Attach(pool) + //} + // + //s.mu.Unlock() + // + //s.throw(EventPoolConstruct, pool) + // + //if previous != nil { + // go func(previous Pool, pWatcher Controller) { + // s.throw(EventPoolDestruct, previous) + // if pWatcher != nil { + // pWatcher.Detach() + // } + // + // previous.Destroy() + // }(previous, pWatcher) + //} + // + //return nil + return nil +} |