diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/http/config.go | 59 | ||||
-rw-r--r-- | plugins/http/plugin.go | 252 | ||||
-rw-r--r-- | plugins/http/response.go | 4 | ||||
-rw-r--r-- | plugins/http/rpc.go | 62 | ||||
-rw-r--r-- | plugins/http/test/.rr-http.yaml | 37 | ||||
-rw-r--r-- | plugins/http/test/http_test.go | 53 | ||||
-rw-r--r-- | plugins/http/test/psr-worker.php | 23 | ||||
-rw-r--r-- | plugins/http/test/rr-http.yaml | 34 | ||||
-rw-r--r-- | plugins/server/plugin.go | 78 |
9 files changed, 363 insertions, 239 deletions
diff --git a/plugins/http/config.go b/plugins/http/config.go index b3d16b62..7922f485 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -11,9 +11,6 @@ import ( "github.com/spiral/roadrunner/v2" ) -type PoolConfig struct { -} - type ServerConfig struct { // Command includes command strings with all the parameters, example: "php worker.php pipes". Command string @@ -32,7 +29,7 @@ type ServerConfig struct { // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change // while server is running. - PoolCfg *roadrunner.PoolConfig + env map[string]string } @@ -61,8 +58,8 @@ type Config struct { // Uploads configures uploads configuration. Uploads *UploadsConfig - // Workers configures rr server and worker pool. - Workers *ServerConfig + // Pool configures worker pool. + Pool *roadrunner.PoolConfig } // FCGIConfig for FastCGI server. @@ -135,10 +132,10 @@ func (c *Config) EnableFCGI() bool { } // Hydrate must populate Config values using given Config source. Must return error if Config is not valid. -func (c *Config) Hydrate(cfg service.Config) error { - if c.Workers == nil { - c.Workers = &roadrunner.ServerConfig{} - } +func (c *Config) Hydrate(cfg Config) error { + //if c.Workers == nil { + // c.Workers = &ServerConfig{} + //} if c.HTTP2 == nil { c.HTTP2 = &HTTP2Config{} @@ -164,16 +161,16 @@ func (c *Config) Hydrate(cfg service.Config) error { if err != nil { return err } - err = c.Workers.InitDefaults() - if err != nil { - return err - } - - if err := cfg.Unmarshal(c); err != nil { - return err - } - - c.Workers.UpscaleDurations() + //err = c.Workers.InitDefaults() + //if err != nil { + // return err + //} + // + //if err := cfg.Unmarshal(c); err != nil { + // return err + //} + // + //c.Workers.UpscaleDurations() if c.TrustedSubnets == nil { // @see https://en.wikipedia.org/wiki/Reserved_IP_addresses @@ -238,17 +235,17 @@ func (c *Config) Valid() error { return errors.New("malformed http2 config") } - if c.Workers == nil { - return errors.New("malformed workers config") - } - - if c.Workers.Pool == nil { - return errors.New("malformed workers config (pool config is missing)") - } - - if err := c.Workers.Pool.Valid(); err != nil { - return err - } + //if c.Workers == nil { + // return errors.New("malformed workers config") + //} + // + //if c.Workers.Pool == nil { + // return errors.New("malformed workers config (pool config is missing)") + //} + + //if err := c.Workers.Pool.Valid(); err != nil { + // return err + //} if !c.EnableHTTP() && !c.EnableTLS() && !c.EnableFCGI() { return errors.New("unable to run http service, no method has been specified (http, https, http/2 or FastCGI)") diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 24eaa68c..94b6c74b 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -12,11 +12,13 @@ import ( "strings" "sync" + "github.com/hashicorp/go-multierror" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" - factory "github.com/spiral/roadrunner/v2/interfaces/app" "github.com/spiral/roadrunner/v2/interfaces/log" + factory "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/util" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "golang.org/x/sys/cpu" @@ -79,7 +81,7 @@ func (s *Plugin) AddListener(l func(event int, ctx interface{})) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerFactory) error { +func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.WorkerFactory) error { const op = errors.Op("http Init") err := cfg.UnmarshalKey(ServiceName, &s.cfg) if err != nil { @@ -88,14 +90,18 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF s.log = log - p, err := app.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ - Debug: s.cfg.Workers.PoolCfg.Debug, - NumWorkers: s.cfg.Workers.PoolCfg.NumWorkers, - MaxJobs: s.cfg.Workers.PoolCfg.MaxJobs, - AllocateTimeout: s.cfg.Workers.PoolCfg.AllocateTimeout, - DestroyTimeout: s.cfg.Workers.PoolCfg.DestroyTimeout, + // Set needed env vars + env := make(map[string]string) + env["RR_HTTP"] = "true" + + p, err := server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + Debug: s.cfg.Pool.Debug, + NumWorkers: s.cfg.Pool.NumWorkers, + MaxJobs: s.cfg.Pool.MaxJobs, + AllocateTimeout: s.cfg.Pool.AllocateTimeout, + DestroyTimeout: s.cfg.Pool.DestroyTimeout, Supervisor: nil, - }, nil) + }, env) if err != nil { return errors.E(op, err) @@ -117,27 +123,29 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF } // Serve serves the svc. -func (s *Plugin) Serve() error { +func (s *Plugin) Serve() chan error { s.Lock() + const op = errors.Op("serve http") + errCh := make(chan error, 2) - if s.env != nil { - if err := s.env.Copy(s.cfg.Workers); err != nil { - return nil - } - } - - s.cfg.Workers.CommandProducer = s.cprod - s.cfg.Workers.SetEnv("RR_HTTP", "true") - - s.rr = roadrunner.NewServer(s.cfg.Workers) - s.rr.Listen(s.throw) - - if s.controller != nil { - s.rr.Attach(s.controller) - } + //if s.env != nil { + // if err := s.env.Copy(s.cfg.Workers); err != nil { + // return nil + // } + //} + // + //s.cfg.Workers.CommandProducer = s.cprod + //s.cfg.Workers.SetEnv("RR_HTTP", "true") + // + //s.rr = roadrunner.NewServer(s.cfg.Workers) + //s.rr.Listen(s.throw) + // + //if s.controller != nil { + // s.rr.Attach(s.controller) + //} s.handler = &Handler{cfg: s.cfg, rr: s.rr} - s.handler.Listen(s.throw) + //s.handler.Listen(s.throw) if s.cfg.EnableHTTP() { if s.cfg.EnableH2C() { @@ -152,13 +160,15 @@ func (s *Plugin) Serve() error { if s.cfg.SSL.RootCA != "" { err := s.appendRootCa() if err != nil { - return err + errCh <- errors.E(op, err) + return errCh } } if s.cfg.EnableHTTP2() { if err := s.initHTTP2(); err != nil { - return err + errCh <- errors.E(op, err) + return errCh } } } @@ -169,21 +179,19 @@ func (s *Plugin) Serve() error { s.Unlock() - if err := s.rr.Start(); err != nil { - return err - } - defer s.rr.Stop() - - err := make(chan error, 3) + //if err := s.rr.Start(); err != nil { + // return err + //} + //defer s.rr.Stop() if s.http != nil { go func() { httpErr := s.http.ListenAndServe() if httpErr != nil && httpErr != http.ErrServerClosed { - err <- httpErr - } else { - err <- nil + errCh <- errors.E(op, httpErr) + return } + return }() } @@ -195,10 +203,10 @@ func (s *Plugin) Serve() error { ) if httpErr != nil && httpErr != http.ErrServerClosed { - err <- httpErr + errCh <- errors.E(op, httpErr) return } - err <- nil + return }() } @@ -206,72 +214,54 @@ func (s *Plugin) Serve() error { go func() { httpErr := s.serveFCGI() if httpErr != nil && httpErr != http.ErrServerClosed { - err <- httpErr + errCh <- errors.E(op, httpErr) return } - err <- nil + return }() } - return <-err + return errCh } // Stop stops the http. -func (s *Plugin) Stop() { +func (s *Plugin) Stop() error { s.Lock() defer s.Unlock() + var err error if s.fcgi != nil { - s.Add(1) - go func() { - defer s.Done() - err := s.fcgi.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - // Stop() error - // push error from goroutines to the channel and block unil error or success shutdown or timeout - s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err)) - return - } - }() + err = s.fcgi.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the fcgi server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } } if s.https != nil { - s.Add(1) - go func() { - defer s.Done() - err := s.https.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err)) - return - } - }() + err = s.https.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the https server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } } if s.http != nil { - s.Add(1) - go func() { - defer s.Done() - err := s.http.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err)) - return - } - }() + err = s.http.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the http server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } } - s.Wait() -} - -// Server returns associated rr server (if any). -func (s *Plugin) Server() *roadrunner.Server { - s.Lock() - defer s.Unlock() - - return s.rr + return err } // ServeHTTP handles connection using set of middleware and rr PSR-7 server. -func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect { target := &url.URL{ Scheme: "https", @@ -288,7 +278,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") } - r = attributes.Init(r) + //r = attributes.Init(r) // chaining middleware f := s.handler.ServeHTTP @@ -300,9 +290,10 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { // append RootCA to the https server TLS config func (s *Plugin) appendRootCa() error { + const op = errors.Op("append root CA") rootCAs, err := x509.SystemCertPool() if err != nil { - s.throw(EventInitSSL, nil) + //s.throw(EventInitSSL, nil) return nil } if rootCAs == nil { @@ -311,20 +302,20 @@ func (s *Plugin) appendRootCa() error { CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA) if err != nil { - s.throw(EventInitSSL, nil) + //s.throw(EventInitSSL, nil) return err } // should append our CA cert ok := rootCAs.AppendCertsFromPEM(CA) if !ok { - return couldNotAppendPemError + return errors.E(op, errors.Str("could not append Certs from PEM")) } - config := &tls.Config{ + cfg := &tls.Config{ InsecureSkipVerify: false, RootCAs: rootCAs, } - s.http.TLSConfig = config + s.http.TLSConfig = cfg return nil } @@ -394,7 +385,7 @@ func (s *Plugin) initSSL() *http.Server { PreferServerCipherSuites: true, }, } - s.throw(EventInitSSL, server) + //s.throw(EventInitSSL, server) return server } @@ -422,16 +413,16 @@ func (s *Plugin) serveFCGI() error { } // throw handles service, server and pool events. -func (s *Plugin) throw(event int, ctx interface{}) { - for _, l := range s.lsns { - l(event, ctx) - } - - if event == roadrunner.EventServerFailure { - // underlying rr server is dead - s.Stop() - } -} +//func (s *Plugin) throw(event int, ctx interface{}) { +// for _, l := range s.lsns { +// l(event, ctx) +// } +// +// if event == roadrunner.EventServerFailure { +// // underlying rr server is dead +// s.Stop() +// } +//} // tlsAddr replaces listen or host port with port configured by SSL config. func (s *Plugin) tlsAddr(host string, forcePort bool) string { @@ -444,3 +435,66 @@ func (s *Plugin) tlsAddr(host string, forcePort bool) string { return host } + +// Server returns associated rr workers +func (s *Plugin) Workers() []roadrunner.WorkerBase { + return s.rr.Workers() +} + +func (s *Plugin) Reset() error { + // re-read the config + // destroy the pool + // attach new one + + //s.mup.Lock() + //defer s.mup.Unlock() + // + //s.mu.Lock() + //if !s.started { + // s.cfg = cfg + // s.mu.Unlock() + // return nil + //} + //s.mu.Unlock() + // + //if s.cfg.Differs(cfg) { + // return errors.New("unable to reconfigure server (cmd and pool changes are allowed)") + //} + // + //s.mu.Lock() + //previous := s.pool + //pWatcher := s.pController + //s.mu.Unlock() + // + //pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) + //if err != nil { + // return err + //} + // + //pool.Listen(s.poolListener) + // + //s.mu.Lock() + //s.cfg.Pool, s.pool = cfg.Pool, pool + // + //if s.controller != nil { + // s.pController = s.controller.Attach(pool) + //} + // + //s.mu.Unlock() + // + //s.throw(EventPoolConstruct, pool) + // + //if previous != nil { + // go func(previous Pool, pWatcher Controller) { + // s.throw(EventPoolDestruct, previous) + // if pWatcher != nil { + // pWatcher.Detach() + // } + // + // previous.Destroy() + // }(previous, pWatcher) + //} + // + //return nil + return nil +} diff --git a/plugins/http/response.go b/plugins/http/response.go index 88848b9d..c3de434f 100644 --- a/plugins/http/response.go +++ b/plugins/http/response.go @@ -6,9 +6,9 @@ import ( "strings" json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2" ) - // Response handles PSR7 response logic. type Response struct { // Status contains response status. @@ -22,7 +22,7 @@ type Response struct { } // NewResponse creates new response based on given rr payload. -func NewResponse(p *roadrunner.Payload) (*Response, error) { +func NewResponse(p roadrunner.Payload) (*Response, error) { r := &Response{body: p.Body} j := json.ConfigCompatibleWithStandardLibrary if err := j.Unmarshal(p.Context, r); err != nil { diff --git a/plugins/http/rpc.go b/plugins/http/rpc.go index 7b38dece..56d8f1a1 100644 --- a/plugins/http/rpc.go +++ b/plugins/http/rpc.go @@ -1,34 +1,34 @@ package http -import ( - "github.com/pkg/errors" - "github.com/spiral/roadrunner/util" -) +//import ( +// "github.com/pkg/errors" +// "github.com/spiral/roadrunner/util" +//) -type rpcServer struct{ svc *Service } - -// WorkerList contains list of workers. -type WorkerList struct { - // Workers is list of workers. - Workers []*util.State `json:"workers"` -} - -// Reset resets underlying RR worker pool and restarts all of it's workers. -func (rpc *rpcServer) Reset(reset bool, r *string) error { - if rpc.svc == nil || rpc.svc.handler == nil { - return errors.New("http server is not running") - } - - *r = "OK" - return rpc.svc.Server().Reset() -} - -// Workers returns list of active workers and their stats. -func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) { - if rpc.svc == nil || rpc.svc.handler == nil { - return errors.New("http server is not running") - } - - r.Workers, err = util.ServerState(rpc.svc.Server()) - return err -} +//type rpcServer struct{ svc *Plugin } +// +//// WorkerList contains list of workers. +//type WorkerList struct { +// // Workers is list of workers. +// Workers []*util.State `json:"workers"` +//} +// +//// Reset resets underlying RR worker pool and restarts all of it's workers. +//func (rpc *rpcServer) Reset(reset bool, r *string) error { +// if rpc.svc == nil || rpc.svc.handler == nil { +// return errors.New("http server is not running") +// } +// +// *r = "OK" +// return rpc.svc.Server().Reset() +//} +// +//// Workers returns list of active workers and their stats. +//func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) { +// if rpc.svc == nil || rpc.svc.handler == nil { +// return errors.New("http server is not running") +// } +// +// r.Workers, err = util.ServerState(rpc.svc.Server()) +// return err +//} diff --git a/plugins/http/test/.rr-http.yaml b/plugins/http/test/.rr-http.yaml new file mode 100644 index 00000000..6fbfd378 --- /dev/null +++ b/plugins/http/test/.rr-http.yaml @@ -0,0 +1,37 @@ +server: + command: "php psr-worker.php" + user: "" + group: "" + env: + "RR_HTTP": "true" + relay: "pipes" + relayTimeout: "20s" + +http: + debug: true + address: 0.0.0.0:8080 + maxRequestSize: 200 + middleware: [ "" ] + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + numWorkers: 4 + maxJobs: 0 + allocateTimeout: 60s + destroyTimeout: 60s + + # ssl: + # port: 443 + # redirect: true + # cert: server.crt + # key: server.key + # rootCa: root.crt + fcgi: + address: tcp://0.0.0.0:6920 + http2: + enabled: false + h2c: false + maxConcurrentStreams: 128 + + diff --git a/plugins/http/test/http_test.go b/plugins/http/test/http_test.go index c109d930..07925d33 100644 --- a/plugins/http/test/http_test.go +++ b/plugins/http/test/http_test.go @@ -1,10 +1,18 @@ package test import ( + "os" + "os/signal" + "syscall" "testing" + "time" "github.com/spiral/endure" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/logger" + //rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" "github.com/stretchr/testify/assert" ) @@ -14,10 +22,51 @@ func TestHTTPInit(t *testing.T) { cfg := &config.Viper{ Path: ".rr-http.yaml", - Prefix: "", + Prefix: "rr", } + err = cont.RegisterAll( + cfg, + //&rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &http.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } - ) + ch, err := cont.Serve() assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Minute * 3) + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } } diff --git a/plugins/http/test/psr-worker.php b/plugins/http/test/psr-worker.php new file mode 100644 index 00000000..65fc6bde --- /dev/null +++ b/plugins/http/test/psr-worker.php @@ -0,0 +1,23 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ +use Spiral\Goridge; +use Spiral\RoadRunner; + +ini_set('display_errors', 'stderr'); +require dirname(__DIR__) . "/../../vendor_php/autoload.php"; + +$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$psr7 = new RoadRunner\PSR7Client($worker); + +while ($req = $psr7->acceptRequest()) { + try { + $resp = new \Zend\Diactoros\Response(); + $resp->getBody()->write("hello world"); + + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +}
\ No newline at end of file diff --git a/plugins/http/test/rr-http.yaml b/plugins/http/test/rr-http.yaml deleted file mode 100644 index 8a04a1f1..00000000 --- a/plugins/http/test/rr-http.yaml +++ /dev/null @@ -1,34 +0,0 @@ -http: - address: 0.0.0.0:8080 - maxRequestSize: 200 - middlewares: [ "" ] - uploads: - forbid: [ ".php", ".exe", ".bat" ] - trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] - workers: - command: "php psr-worker.php pipes" - user: "" - - # connection method (pipes, tcp://:9000, unix://socket.unix). default "pipes" - relay: "pipes" - - pool: - numWorkers: 4 - maxJobs: 0 - allocateTimeout: 60 - destroyTimeout: 60 - - ssl: - port: 443 - redirect: true - cert: server.crt - key: server.key - rootCa: root.crt - fcgi: - address: tcp://0.0.0.0:6920 - http2: - enabled: true - h2c: true - maxConcurrentStreams: 128 - - diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index e096708a..4d606390 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -25,49 +25,47 @@ type Plugin struct { } // Init application provider. -func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error { +func (server *Plugin) Init(cfg config.Configurer, log log.Logger) error { const op = errors.Op("Init") - err := cfg.UnmarshalKey(ServiceName, &app.cfg) + err := cfg.UnmarshalKey(ServiceName, &server.cfg) if err != nil { return errors.E(op, errors.Init, err) } - app.cfg.InitDefaults() - app.log = log + server.cfg.InitDefaults() + server.log = log + + server.factory, err = server.initFactory() + if err != nil { + return errors.E(errors.Op("Init factory"), err) + } return nil } // Name contains service name. -func (app *Plugin) Name() string { +func (server *Plugin) Name() string { return ServiceName } -func (app *Plugin) Serve() chan error { +func (server *Plugin) Serve() chan error { errCh := make(chan error, 1) - var err error - - app.factory, err = app.initFactory() - if err != nil { - errCh <- errors.E(errors.Op("init factory"), err) - } - return errCh } -func (app *Plugin) Stop() error { - if app.factory == nil { +func (server *Plugin) Stop() error { + if server.factory == nil { return nil } - return app.factory.Close(context.Background()) + return server.factory.Close(context.Background()) } // CmdFactory provides worker command factory assocated with given context. -func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { +func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { var cmdArgs []string // create command according to the config - cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) + cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...) return func() *exec.Cmd { cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) @@ -75,67 +73,67 @@ func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { // if user is not empty, and OS is linux or macos // execute php worker from that particular user - if app.cfg.User != "" { - err := util.ExecuteFromUser(cmd, app.cfg.User) + if server.cfg.User != "" { + err := util.ExecuteFromUser(cmd, server.cfg.User) if err != nil { return nil } } - cmd.Env = app.setEnv(env) + cmd.Env = server.setEnv(env) return cmd }, nil } // NewWorker issues new standalone worker. -func (app *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { +func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { const op = errors.Op("new worker") - spawnCmd, err := app.CmdFactory(env) + spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, errors.E(op, err) } - w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) + w, err := server.factory.SpawnWorkerWithContext(ctx, spawnCmd()) if err != nil { return nil, errors.E(op, err) } - w.AddListener(app.collectLogs) + w.AddListener(server.collectLogs) return w, nil } // NewWorkerPool issues new worker pool. -func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { - spawnCmd, err := app.CmdFactory(env) +func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { + spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, err } - p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt) if err != nil { return nil, err } - p.AddListener(app.collectLogs) + p.AddListener(server.collectLogs) return p, nil } // creates relay and worker factory. -func (app *Plugin) initFactory() (roadrunner.Factory, error) { +func (server *Plugin) initFactory() (roadrunner.Factory, error) { const op = errors.Op("network factory init") - if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { + if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } - dsn := strings.Split(app.cfg.Relay, "://") + dsn := strings.Split(server.cfg.Relay, "://") if len(dsn) != 2 { return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } - lsn, err := util.CreateListener(app.cfg.Relay) + lsn, err := util.CreateListener(server.cfg.Relay) if err != nil { return nil, errors.E(op, errors.Network, err) } @@ -143,16 +141,16 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) { switch dsn[0] { // sockets group case "unix": - return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil case "tcp": - return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil default: return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } -func (app *Plugin) setEnv(e server.Env) []string { - env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) +func (server *Plugin) setEnv(e server.Env) []string { + env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay)) for k, v := range e { env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) } @@ -160,13 +158,13 @@ func (app *Plugin) setEnv(e server.Env) []string { return env } -func (app *Plugin) collectLogs(event interface{}) { +func (server *Plugin) collectLogs(event interface{}) { if we, ok := event.(roadrunner.WorkerEvent); ok { switch we.Event { case roadrunner.EventWorkerError: - app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) + server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) case roadrunner.EventWorkerLog: - app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) + server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) } } } |