summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/config.go59
-rw-r--r--plugins/http/plugin.go252
-rw-r--r--plugins/http/response.go4
-rw-r--r--plugins/http/rpc.go62
-rw-r--r--plugins/http/test/.rr-http.yaml37
-rw-r--r--plugins/http/test/http_test.go53
-rw-r--r--plugins/http/test/psr-worker.php23
-rw-r--r--plugins/http/test/rr-http.yaml34
-rw-r--r--plugins/server/plugin.go78
9 files changed, 363 insertions, 239 deletions
diff --git a/plugins/http/config.go b/plugins/http/config.go
index b3d16b62..7922f485 100644
--- a/plugins/http/config.go
+++ b/plugins/http/config.go
@@ -11,9 +11,6 @@ import (
"github.com/spiral/roadrunner/v2"
)
-type PoolConfig struct {
-}
-
type ServerConfig struct {
// Command includes command strings with all the parameters, example: "php worker.php pipes".
Command string
@@ -32,7 +29,7 @@ type ServerConfig struct {
// Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change
// while server is running.
- PoolCfg *roadrunner.PoolConfig
+
env map[string]string
}
@@ -61,8 +58,8 @@ type Config struct {
// Uploads configures uploads configuration.
Uploads *UploadsConfig
- // Workers configures rr server and worker pool.
- Workers *ServerConfig
+ // Pool configures worker pool.
+ Pool *roadrunner.PoolConfig
}
// FCGIConfig for FastCGI server.
@@ -135,10 +132,10 @@ func (c *Config) EnableFCGI() bool {
}
// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
-func (c *Config) Hydrate(cfg service.Config) error {
- if c.Workers == nil {
- c.Workers = &roadrunner.ServerConfig{}
- }
+func (c *Config) Hydrate(cfg Config) error {
+ //if c.Workers == nil {
+ // c.Workers = &ServerConfig{}
+ //}
if c.HTTP2 == nil {
c.HTTP2 = &HTTP2Config{}
@@ -164,16 +161,16 @@ func (c *Config) Hydrate(cfg service.Config) error {
if err != nil {
return err
}
- err = c.Workers.InitDefaults()
- if err != nil {
- return err
- }
-
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- c.Workers.UpscaleDurations()
+ //err = c.Workers.InitDefaults()
+ //if err != nil {
+ // return err
+ //}
+ //
+ //if err := cfg.Unmarshal(c); err != nil {
+ // return err
+ //}
+ //
+ //c.Workers.UpscaleDurations()
if c.TrustedSubnets == nil {
// @see https://en.wikipedia.org/wiki/Reserved_IP_addresses
@@ -238,17 +235,17 @@ func (c *Config) Valid() error {
return errors.New("malformed http2 config")
}
- if c.Workers == nil {
- return errors.New("malformed workers config")
- }
-
- if c.Workers.Pool == nil {
- return errors.New("malformed workers config (pool config is missing)")
- }
-
- if err := c.Workers.Pool.Valid(); err != nil {
- return err
- }
+ //if c.Workers == nil {
+ // return errors.New("malformed workers config")
+ //}
+ //
+ //if c.Workers.Pool == nil {
+ // return errors.New("malformed workers config (pool config is missing)")
+ //}
+
+ //if err := c.Workers.Pool.Valid(); err != nil {
+ // return err
+ //}
if !c.EnableHTTP() && !c.EnableTLS() && !c.EnableFCGI() {
return errors.New("unable to run http service, no method has been specified (http, https, http/2 or FastCGI)")
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 24eaa68c..94b6c74b 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -12,11 +12,13 @@ import (
"strings"
"sync"
+ "github.com/hashicorp/go-multierror"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
- factory "github.com/spiral/roadrunner/v2/interfaces/app"
"github.com/spiral/roadrunner/v2/interfaces/log"
+ factory "github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/util"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sys/cpu"
@@ -79,7 +81,7 @@ func (s *Plugin) AddListener(l func(event int, ctx interface{})) {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerFactory) error {
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.WorkerFactory) error {
const op = errors.Op("http Init")
err := cfg.UnmarshalKey(ServiceName, &s.cfg)
if err != nil {
@@ -88,14 +90,18 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF
s.log = log
- p, err := app.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
- Debug: s.cfg.Workers.PoolCfg.Debug,
- NumWorkers: s.cfg.Workers.PoolCfg.NumWorkers,
- MaxJobs: s.cfg.Workers.PoolCfg.MaxJobs,
- AllocateTimeout: s.cfg.Workers.PoolCfg.AllocateTimeout,
- DestroyTimeout: s.cfg.Workers.PoolCfg.DestroyTimeout,
+ // Set needed env vars
+ env := make(map[string]string)
+ env["RR_HTTP"] = "true"
+
+ p, err := server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ Debug: s.cfg.Pool.Debug,
+ NumWorkers: s.cfg.Pool.NumWorkers,
+ MaxJobs: s.cfg.Pool.MaxJobs,
+ AllocateTimeout: s.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: s.cfg.Pool.DestroyTimeout,
Supervisor: nil,
- }, nil)
+ }, env)
if err != nil {
return errors.E(op, err)
@@ -117,27 +123,29 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF
}
// Serve serves the svc.
-func (s *Plugin) Serve() error {
+func (s *Plugin) Serve() chan error {
s.Lock()
+ const op = errors.Op("serve http")
+ errCh := make(chan error, 2)
- if s.env != nil {
- if err := s.env.Copy(s.cfg.Workers); err != nil {
- return nil
- }
- }
-
- s.cfg.Workers.CommandProducer = s.cprod
- s.cfg.Workers.SetEnv("RR_HTTP", "true")
-
- s.rr = roadrunner.NewServer(s.cfg.Workers)
- s.rr.Listen(s.throw)
-
- if s.controller != nil {
- s.rr.Attach(s.controller)
- }
+ //if s.env != nil {
+ // if err := s.env.Copy(s.cfg.Workers); err != nil {
+ // return nil
+ // }
+ //}
+ //
+ //s.cfg.Workers.CommandProducer = s.cprod
+ //s.cfg.Workers.SetEnv("RR_HTTP", "true")
+ //
+ //s.rr = roadrunner.NewServer(s.cfg.Workers)
+ //s.rr.Listen(s.throw)
+ //
+ //if s.controller != nil {
+ // s.rr.Attach(s.controller)
+ //}
s.handler = &Handler{cfg: s.cfg, rr: s.rr}
- s.handler.Listen(s.throw)
+ //s.handler.Listen(s.throw)
if s.cfg.EnableHTTP() {
if s.cfg.EnableH2C() {
@@ -152,13 +160,15 @@ func (s *Plugin) Serve() error {
if s.cfg.SSL.RootCA != "" {
err := s.appendRootCa()
if err != nil {
- return err
+ errCh <- errors.E(op, err)
+ return errCh
}
}
if s.cfg.EnableHTTP2() {
if err := s.initHTTP2(); err != nil {
- return err
+ errCh <- errors.E(op, err)
+ return errCh
}
}
}
@@ -169,21 +179,19 @@ func (s *Plugin) Serve() error {
s.Unlock()
- if err := s.rr.Start(); err != nil {
- return err
- }
- defer s.rr.Stop()
-
- err := make(chan error, 3)
+ //if err := s.rr.Start(); err != nil {
+ // return err
+ //}
+ //defer s.rr.Stop()
if s.http != nil {
go func() {
httpErr := s.http.ListenAndServe()
if httpErr != nil && httpErr != http.ErrServerClosed {
- err <- httpErr
- } else {
- err <- nil
+ errCh <- errors.E(op, httpErr)
+ return
}
+ return
}()
}
@@ -195,10 +203,10 @@ func (s *Plugin) Serve() error {
)
if httpErr != nil && httpErr != http.ErrServerClosed {
- err <- httpErr
+ errCh <- errors.E(op, httpErr)
return
}
- err <- nil
+ return
}()
}
@@ -206,72 +214,54 @@ func (s *Plugin) Serve() error {
go func() {
httpErr := s.serveFCGI()
if httpErr != nil && httpErr != http.ErrServerClosed {
- err <- httpErr
+ errCh <- errors.E(op, httpErr)
return
}
- err <- nil
+ return
}()
}
- return <-err
+ return errCh
}
// Stop stops the http.
-func (s *Plugin) Stop() {
+func (s *Plugin) Stop() error {
s.Lock()
defer s.Unlock()
+ var err error
if s.fcgi != nil {
- s.Add(1)
- go func() {
- defer s.Done()
- err := s.fcgi.Shutdown(context.Background())
- if err != nil && err != http.ErrServerClosed {
- // Stop() error
- // push error from goroutines to the channel and block unil error or success shutdown or timeout
- s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err))
- return
- }
- }()
+ err = s.fcgi.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error("error shutting down the fcgi server", "error", err)
+ // write error and try to stop other transport
+ err = multierror.Append(err)
+ }
}
if s.https != nil {
- s.Add(1)
- go func() {
- defer s.Done()
- err := s.https.Shutdown(context.Background())
- if err != nil && err != http.ErrServerClosed {
- s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err))
- return
- }
- }()
+ err = s.https.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error("error shutting down the https server", "error", err)
+ // write error and try to stop other transport
+ err = multierror.Append(err)
+ }
}
if s.http != nil {
- s.Add(1)
- go func() {
- defer s.Done()
- err := s.http.Shutdown(context.Background())
- if err != nil && err != http.ErrServerClosed {
- s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err))
- return
- }
- }()
+ err = s.http.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error("error shutting down the http server", "error", err)
+ // write error and try to stop other transport
+ err = multierror.Append(err)
+ }
}
- s.Wait()
-}
-
-// Server returns associated rr server (if any).
-func (s *Plugin) Server() *roadrunner.Server {
- s.Lock()
- defer s.Unlock()
-
- return s.rr
+ return err
}
// ServeHTTP handles connection using set of middleware and rr PSR-7 server.
-func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect {
target := &url.URL{
Scheme: "https",
@@ -288,7 +278,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
}
- r = attributes.Init(r)
+ //r = attributes.Init(r)
// chaining middleware
f := s.handler.ServeHTTP
@@ -300,9 +290,10 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// append RootCA to the https server TLS config
func (s *Plugin) appendRootCa() error {
+ const op = errors.Op("append root CA")
rootCAs, err := x509.SystemCertPool()
if err != nil {
- s.throw(EventInitSSL, nil)
+ //s.throw(EventInitSSL, nil)
return nil
}
if rootCAs == nil {
@@ -311,20 +302,20 @@ func (s *Plugin) appendRootCa() error {
CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA)
if err != nil {
- s.throw(EventInitSSL, nil)
+ //s.throw(EventInitSSL, nil)
return err
}
// should append our CA cert
ok := rootCAs.AppendCertsFromPEM(CA)
if !ok {
- return couldNotAppendPemError
+ return errors.E(op, errors.Str("could not append Certs from PEM"))
}
- config := &tls.Config{
+ cfg := &tls.Config{
InsecureSkipVerify: false,
RootCAs: rootCAs,
}
- s.http.TLSConfig = config
+ s.http.TLSConfig = cfg
return nil
}
@@ -394,7 +385,7 @@ func (s *Plugin) initSSL() *http.Server {
PreferServerCipherSuites: true,
},
}
- s.throw(EventInitSSL, server)
+ //s.throw(EventInitSSL, server)
return server
}
@@ -422,16 +413,16 @@ func (s *Plugin) serveFCGI() error {
}
// throw handles service, server and pool events.
-func (s *Plugin) throw(event int, ctx interface{}) {
- for _, l := range s.lsns {
- l(event, ctx)
- }
-
- if event == roadrunner.EventServerFailure {
- // underlying rr server is dead
- s.Stop()
- }
-}
+//func (s *Plugin) throw(event int, ctx interface{}) {
+// for _, l := range s.lsns {
+// l(event, ctx)
+// }
+//
+// if event == roadrunner.EventServerFailure {
+// // underlying rr server is dead
+// s.Stop()
+// }
+//}
// tlsAddr replaces listen or host port with port configured by SSL config.
func (s *Plugin) tlsAddr(host string, forcePort bool) string {
@@ -444,3 +435,66 @@ func (s *Plugin) tlsAddr(host string, forcePort bool) string {
return host
}
+
+// Server returns associated rr workers
+func (s *Plugin) Workers() []roadrunner.WorkerBase {
+ return s.rr.Workers()
+}
+
+func (s *Plugin) Reset() error {
+ // re-read the config
+ // destroy the pool
+ // attach new one
+
+ //s.mup.Lock()
+ //defer s.mup.Unlock()
+ //
+ //s.mu.Lock()
+ //if !s.started {
+ // s.cfg = cfg
+ // s.mu.Unlock()
+ // return nil
+ //}
+ //s.mu.Unlock()
+ //
+ //if s.cfg.Differs(cfg) {
+ // return errors.New("unable to reconfigure server (cmd and pool changes are allowed)")
+ //}
+ //
+ //s.mu.Lock()
+ //previous := s.pool
+ //pWatcher := s.pController
+ //s.mu.Unlock()
+ //
+ //pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool)
+ //if err != nil {
+ // return err
+ //}
+ //
+ //pool.Listen(s.poolListener)
+ //
+ //s.mu.Lock()
+ //s.cfg.Pool, s.pool = cfg.Pool, pool
+ //
+ //if s.controller != nil {
+ // s.pController = s.controller.Attach(pool)
+ //}
+ //
+ //s.mu.Unlock()
+ //
+ //s.throw(EventPoolConstruct, pool)
+ //
+ //if previous != nil {
+ // go func(previous Pool, pWatcher Controller) {
+ // s.throw(EventPoolDestruct, previous)
+ // if pWatcher != nil {
+ // pWatcher.Detach()
+ // }
+ //
+ // previous.Destroy()
+ // }(previous, pWatcher)
+ //}
+ //
+ //return nil
+ return nil
+}
diff --git a/plugins/http/response.go b/plugins/http/response.go
index 88848b9d..c3de434f 100644
--- a/plugins/http/response.go
+++ b/plugins/http/response.go
@@ -6,9 +6,9 @@ import (
"strings"
json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2"
)
-
// Response handles PSR7 response logic.
type Response struct {
// Status contains response status.
@@ -22,7 +22,7 @@ type Response struct {
}
// NewResponse creates new response based on given rr payload.
-func NewResponse(p *roadrunner.Payload) (*Response, error) {
+func NewResponse(p roadrunner.Payload) (*Response, error) {
r := &Response{body: p.Body}
j := json.ConfigCompatibleWithStandardLibrary
if err := j.Unmarshal(p.Context, r); err != nil {
diff --git a/plugins/http/rpc.go b/plugins/http/rpc.go
index 7b38dece..56d8f1a1 100644
--- a/plugins/http/rpc.go
+++ b/plugins/http/rpc.go
@@ -1,34 +1,34 @@
package http
-import (
- "github.com/pkg/errors"
- "github.com/spiral/roadrunner/util"
-)
+//import (
+// "github.com/pkg/errors"
+// "github.com/spiral/roadrunner/util"
+//)
-type rpcServer struct{ svc *Service }
-
-// WorkerList contains list of workers.
-type WorkerList struct {
- // Workers is list of workers.
- Workers []*util.State `json:"workers"`
-}
-
-// Reset resets underlying RR worker pool and restarts all of it's workers.
-func (rpc *rpcServer) Reset(reset bool, r *string) error {
- if rpc.svc == nil || rpc.svc.handler == nil {
- return errors.New("http server is not running")
- }
-
- *r = "OK"
- return rpc.svc.Server().Reset()
-}
-
-// Workers returns list of active workers and their stats.
-func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) {
- if rpc.svc == nil || rpc.svc.handler == nil {
- return errors.New("http server is not running")
- }
-
- r.Workers, err = util.ServerState(rpc.svc.Server())
- return err
-}
+//type rpcServer struct{ svc *Plugin }
+//
+//// WorkerList contains list of workers.
+//type WorkerList struct {
+// // Workers is list of workers.
+// Workers []*util.State `json:"workers"`
+//}
+//
+//// Reset resets underlying RR worker pool and restarts all of it's workers.
+//func (rpc *rpcServer) Reset(reset bool, r *string) error {
+// if rpc.svc == nil || rpc.svc.handler == nil {
+// return errors.New("http server is not running")
+// }
+//
+// *r = "OK"
+// return rpc.svc.Server().Reset()
+//}
+//
+//// Workers returns list of active workers and their stats.
+//func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) {
+// if rpc.svc == nil || rpc.svc.handler == nil {
+// return errors.New("http server is not running")
+// }
+//
+// r.Workers, err = util.ServerState(rpc.svc.Server())
+// return err
+//}
diff --git a/plugins/http/test/.rr-http.yaml b/plugins/http/test/.rr-http.yaml
new file mode 100644
index 00000000..6fbfd378
--- /dev/null
+++ b/plugins/http/test/.rr-http.yaml
@@ -0,0 +1,37 @@
+server:
+ command: "php psr-worker.php"
+ user: ""
+ group: ""
+ env:
+ "RR_HTTP": "true"
+ relay: "pipes"
+ relayTimeout: "20s"
+
+http:
+ debug: true
+ address: 0.0.0.0:8080
+ maxRequestSize: 200
+ middleware: [ "" ]
+ uploads:
+ forbid: [ ".php", ".exe", ".bat" ]
+ trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ numWorkers: 4
+ maxJobs: 0
+ allocateTimeout: 60s
+ destroyTimeout: 60s
+
+ # ssl:
+ # port: 443
+ # redirect: true
+ # cert: server.crt
+ # key: server.key
+ # rootCa: root.crt
+ fcgi:
+ address: tcp://0.0.0.0:6920
+ http2:
+ enabled: false
+ h2c: false
+ maxConcurrentStreams: 128
+
+
diff --git a/plugins/http/test/http_test.go b/plugins/http/test/http_test.go
index c109d930..07925d33 100644
--- a/plugins/http/test/http_test.go
+++ b/plugins/http/test/http_test.go
@@ -1,10 +1,18 @@
package test
import (
+ "os"
+ "os/signal"
+ "syscall"
"testing"
+ "time"
"github.com/spiral/endure"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ //rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
"github.com/stretchr/testify/assert"
)
@@ -14,10 +22,51 @@ func TestHTTPInit(t *testing.T) {
cfg := &config.Viper{
Path: ".rr-http.yaml",
- Prefix: "",
+ Prefix: "rr",
}
+
err = cont.RegisterAll(
+ cfg,
+ //&rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &http.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
- )
+ ch, err := cont.Serve()
assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ tt := time.NewTimer(time.Minute * 3)
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
}
diff --git a/plugins/http/test/psr-worker.php b/plugins/http/test/psr-worker.php
new file mode 100644
index 00000000..65fc6bde
--- /dev/null
+++ b/plugins/http/test/psr-worker.php
@@ -0,0 +1,23 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+ini_set('display_errors', 'stderr');
+require dirname(__DIR__) . "/../../vendor_php/autoload.php";
+
+$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$psr7 = new RoadRunner\PSR7Client($worker);
+
+while ($req = $psr7->acceptRequest()) {
+ try {
+ $resp = new \Zend\Diactoros\Response();
+ $resp->getBody()->write("hello world");
+
+ $psr7->respond($resp);
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
+ }
+} \ No newline at end of file
diff --git a/plugins/http/test/rr-http.yaml b/plugins/http/test/rr-http.yaml
deleted file mode 100644
index 8a04a1f1..00000000
--- a/plugins/http/test/rr-http.yaml
+++ /dev/null
@@ -1,34 +0,0 @@
-http:
- address: 0.0.0.0:8080
- maxRequestSize: 200
- middlewares: [ "" ]
- uploads:
- forbid: [ ".php", ".exe", ".bat" ]
- trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
- workers:
- command: "php psr-worker.php pipes"
- user: ""
-
- # connection method (pipes, tcp://:9000, unix://socket.unix). default "pipes"
- relay: "pipes"
-
- pool:
- numWorkers: 4
- maxJobs: 0
- allocateTimeout: 60
- destroyTimeout: 60
-
- ssl:
- port: 443
- redirect: true
- cert: server.crt
- key: server.key
- rootCa: root.crt
- fcgi:
- address: tcp://0.0.0.0:6920
- http2:
- enabled: true
- h2c: true
- maxConcurrentStreams: 128
-
-
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index e096708a..4d606390 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -25,49 +25,47 @@ type Plugin struct {
}
// Init application provider.
-func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+func (server *Plugin) Init(cfg config.Configurer, log log.Logger) error {
const op = errors.Op("Init")
- err := cfg.UnmarshalKey(ServiceName, &app.cfg)
+ err := cfg.UnmarshalKey(ServiceName, &server.cfg)
if err != nil {
return errors.E(op, errors.Init, err)
}
- app.cfg.InitDefaults()
- app.log = log
+ server.cfg.InitDefaults()
+ server.log = log
+
+ server.factory, err = server.initFactory()
+ if err != nil {
+ return errors.E(errors.Op("Init factory"), err)
+ }
return nil
}
// Name contains service name.
-func (app *Plugin) Name() string {
+func (server *Plugin) Name() string {
return ServiceName
}
-func (app *Plugin) Serve() chan error {
+func (server *Plugin) Serve() chan error {
errCh := make(chan error, 1)
- var err error
-
- app.factory, err = app.initFactory()
- if err != nil {
- errCh <- errors.E(errors.Op("init factory"), err)
- }
-
return errCh
}
-func (app *Plugin) Stop() error {
- if app.factory == nil {
+func (server *Plugin) Stop() error {
+ if server.factory == nil {
return nil
}
- return app.factory.Close(context.Background())
+ return server.factory.Close(context.Background())
}
// CmdFactory provides worker command factory assocated with given context.
-func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
+func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
var cmdArgs []string
// create command according to the config
- cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
+ cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...)
return func() *exec.Cmd {
cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
@@ -75,67 +73,67 @@ func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
// if user is not empty, and OS is linux or macos
// execute php worker from that particular user
- if app.cfg.User != "" {
- err := util.ExecuteFromUser(cmd, app.cfg.User)
+ if server.cfg.User != "" {
+ err := util.ExecuteFromUser(cmd, server.cfg.User)
if err != nil {
return nil
}
}
- cmd.Env = app.setEnv(env)
+ cmd.Env = server.setEnv(env)
return cmd
}, nil
}
// NewWorker issues new standalone worker.
-func (app *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) {
+func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) {
const op = errors.Op("new worker")
- spawnCmd, err := app.CmdFactory(env)
+ spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, errors.E(op, err)
}
- w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+ w, err := server.factory.SpawnWorkerWithContext(ctx, spawnCmd())
if err != nil {
return nil, errors.E(op, err)
}
- w.AddListener(app.collectLogs)
+ w.AddListener(server.collectLogs)
return w, nil
}
// NewWorkerPool issues new worker pool.
-func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) {
- spawnCmd, err := app.CmdFactory(env)
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) {
+ spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, err
}
- p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt)
+ p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt)
if err != nil {
return nil, err
}
- p.AddListener(app.collectLogs)
+ p.AddListener(server.collectLogs)
return p, nil
}
// creates relay and worker factory.
-func (app *Plugin) initFactory() (roadrunner.Factory, error) {
+func (server *Plugin) initFactory() (roadrunner.Factory, error) {
const op = errors.Op("network factory init")
- if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
+ if server.cfg.Relay == "" || server.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
- dsn := strings.Split(app.cfg.Relay, "://")
+ dsn := strings.Split(server.cfg.Relay, "://")
if len(dsn) != 2 {
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
- lsn, err := util.CreateListener(app.cfg.Relay)
+ lsn, err := util.CreateListener(server.cfg.Relay)
if err != nil {
return nil, errors.E(op, errors.Network, err)
}
@@ -143,16 +141,16 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) {
switch dsn[0] {
// sockets group
case "unix":
- return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
case "tcp":
- return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
default:
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
-func (app *Plugin) setEnv(e server.Env) []string {
- env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
+func (server *Plugin) setEnv(e server.Env) []string {
+ env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay))
for k, v := range e {
env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
}
@@ -160,13 +158,13 @@ func (app *Plugin) setEnv(e server.Env) []string {
return env
}
-func (app *Plugin) collectLogs(event interface{}) {
+func (server *Plugin) collectLogs(event interface{}) {
if we, ok := event.(roadrunner.WorkerEvent); ok {
switch we.Event {
case roadrunner.EventWorkerError:
- app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
+ server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
case roadrunner.EventWorkerLog:
- app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
+ server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
}
}
}