diff options
23 files changed, 360 insertions, 109 deletions
@@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php tests/psr-worker-bench.php" @@ -8,6 +7,7 @@ server: group: "" env: "RR_HTTP": "true" + "RR_RPC": "tcp://127.0.0.1:6001" relay: "pipes" relayTimeout: "20s" @@ -17,11 +17,11 @@ logs: http: address: 127.0.0.1:44933 - maxRequestSize: 1024 + max_request_size: 1024 middleware: ["gzip", "headers"] uploads: forbid: [".php", ".exe", ".bat"] - trustedSubnets: + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", @@ -32,23 +32,34 @@ http: "fe80::/10", ] pool: - numWorkers: 6 - maxJobs: 0 - allocateTimeout: 60s - destroyTimeout: 60s + num_workers: 6 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + supervisor: + # WatchTick defines how often to check the state of worker (seconds) + watch_tick: 10 + # TTL defines maximum time worker is allowed to live (seconds) + ttl: 10 + # IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0 (seconds) + idle_ttl: 10 + # ExecTTL defines maximum lifetime per job (seconds) + exec_ttl: 10 + # MaxWorkerMemory limits memory per worker (MB) + max_worker_memory: 100 -# ssl: -# port: 8892 -# redirect: false -# cert: fixtures/server.crt -# key: fixtures/server.key -# # rootCa: root.crt -# fcgi: -# address: tcp://0.0.0.0:7921 -# http2: -# enabled: false -# h2c: false -# maxConcurrentStreams: 128 + ssl: + port: 8892 + redirect: false + cert: fixtures/server.crt + key: fixtures/server.key + # rootCa: root.crt + fcgi: + address: tcp://0.0.0.0:7921 + http2: + enabled: false + h2c: false + max_concurrent_streams: 128 redis: # UniversalClient is an abstract client which - based on the provided options - diff --git a/codecov.yml b/codecov.yml index 75e92a5f..eb499c3a 100644 --- a/codecov.yml +++ b/codecov.yml @@ -9,4 +9,11 @@ coverage: default: target: auto threshold: 0% - informational: true
\ No newline at end of file + informational: true + +# do not include tests folders +ignore: + - "tests" + - "plugins/kv/boltdb/plugin_unit_test.go" + - "plugins/kv/memcached/plugin_unit_test.go" + - "plugins/kv/memory/plugin_unit_test.go"
\ No newline at end of file diff --git a/pkg/pool/config.go b/pkg/pool/config.go index 3dcc3584..acdd3d6f 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -12,23 +12,23 @@ type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. - NumWorkers int64 + NumWorkers int64 `yaml:"num_workers"` // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. - MaxJobs int64 + MaxJobs int64 `yaml:"max_jobs"` // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. - AllocateTimeout time.Duration + AllocateTimeout time.Duration `yaml:"allocate_timeout"` // DestroyTimeout defines for how long pool should be waiting for worker to // properly destroy, if timeout reached worker will be killed. Defaults to 60s. - DestroyTimeout time.Duration + DestroyTimeout time.Duration `yaml:"destroy_timeout"` // Supervision config to limit worker and pool memory usage. - Supervisor *SupervisorConfig + Supervisor *SupervisorConfig `yaml:"supervisor"` } // InitDefaults enables default config values. @@ -52,19 +52,19 @@ func (cfg *Config) InitDefaults() { type SupervisorConfig struct { // WatchTick defines how often to check the state of worker. - WatchTick uint64 + WatchTick uint64 `yaml:"watch_tick"` // TTL defines maximum time worker is allowed to live. - TTL uint64 + TTL uint64 `yaml:"ttl"` // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL uint64 + IdleTTL uint64 `yaml:"idle_ttl"` // ExecTTL defines maximum lifetime per job. - ExecTTL uint64 + ExecTTL uint64 `yaml:"exec_ttl"` // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 + MaxWorkerMemory uint64 `yaml:"max_worker_memory"` } // InitDefaults enables default config values. diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 378be7dd..07fa7019 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -16,6 +16,9 @@ import ( const MB = 1024 * 1024 +// NSEC_IN_SEC nanoseconds in second +const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck + type Supervised interface { pool.Pool // Start used to start watching process for all pool workers @@ -54,7 +57,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) } c := make(chan ttlExec, 1) - ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL)) + ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second) defer cancel() go func() { res, err := sp.pool.ExecWithContext(ctx, rqs) @@ -114,7 +117,7 @@ func (sp *supervised) Destroy(ctx context.Context) { func (sp *supervised) Start() { go func() { - watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick)) + watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second) for { select { case <-sp.stopCh: @@ -186,14 +189,28 @@ func (sp *supervised) control() { we are guessing that worker overlap idle time and has to be killed */ + // 1610530005534416045 lu + // lu - now = -7811150814 - nanoseconds + // 7.8 seconds // get last used unix nano lu := workers[i].State().LastUsed() + // worker not used, skip + if lu == 0 { + continue + } - // convert last used to unixNano and sub time.now - res := int64(lu) - now.UnixNano() + // convert last used to unixNano and sub time.now to seconds + // negative number, because lu always in the past, except for the `back to the future` :) + res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 // maxWorkerIdle more than diff between now and last used - if sp.cfg.IdleTTL-uint64(res) <= 0 { + // for example: + // After exec worker goes to the rest + // And resting for the 5 seconds + // IdleTTL is 1 second. + // After the control check, res will be 5, idle is 1 + // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. + if int64(sp.cfg.IdleTTL)-res <= 0 { err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index cb67ebe1..72226bee 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -112,6 +112,47 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { assert.NotEqual(t, pid, p.Workers()[0].Pid()) } +func TestSupervisedPool_Idle(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: int64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1, + TTL: 100, + IdleTTL: 1, + ExecTTL: 100, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Nil(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 5) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) +} + func TestSupervisedPool_ExecTTL_OK(t *testing.T) { var cfgExecTTL = Config{ NumWorkers: int64(1), diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 348f0459..127dc801 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -236,7 +236,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { pid := wb.Pid() if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(internal.StateInvalid) + wb.State().Set(internal.StateRemove) err := wb.Kill() if err != nil { return errors.E(op, err) @@ -244,7 +244,6 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { return nil } - wb.State().Set(internal.StateRemove) return nil } diff --git a/plugins/config/interface.go b/plugins/config/interface.go index 2a7c67ce..23279f53 100644 --- a/plugins/config/interface.go +++ b/plugins/config/interface.go @@ -1,7 +1,7 @@ package config type Configurer interface { - // UnmarshalKey reads configuration section into configuration object. + // // UnmarshalKey takes a single key and unmarshals it into a Struct. // // func (h *HttpService) Init(cp config.Configurer) error { // h.config := &HttpConfig{} @@ -11,6 +11,10 @@ type Configurer interface { // } UnmarshalKey(name string, out interface{}) error + // Unmarshal unmarshals the config into a Struct. Make sure that the tags + // on the fields of the structure are properly set. + Unmarshal(out interface{}) error + // Get used to get config section Get(name string) interface{} diff --git a/plugins/config/plugin.go b/plugins/config/plugin.go index 1a170448..9cecf9f9 100755 --- a/plugins/config/plugin.go +++ b/plugins/config/plugin.go @@ -64,6 +64,15 @@ func (v *Viper) UnmarshalKey(name string, out interface{}) error { return nil } +func (v *Viper) Unmarshal(out interface{}) error { + const op = errors.Op("config unmarshal") + err := v.viper.Unmarshal(&out) + if err != nil { + return errors.E(op, err) + } + return nil +} + // Get raw config in a form of config section. func (v *Viper) Get(name string) interface{} { return v.viper.Get(name) diff --git a/plugins/http/config.go b/plugins/http/config.go index 3b670c86..abde8917 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -49,10 +49,10 @@ type Config struct { HTTP2 *HTTP2Config // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited. - MaxRequestSize uint64 + MaxRequestSize uint64 `yaml:"max_request_size"` // TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For - TrustedSubnets []string + TrustedSubnets []string `yaml:"trusted_subnets"` // Uploads configures uploads configuration. Uploads *UploadsConfig @@ -85,7 +85,7 @@ type HTTP2Config struct { H2C bool // MaxConcurrentStreams defaults to 128. - MaxConcurrentStreams uint32 + MaxConcurrentStreams uint32 `yaml:"max_concurrent_streams"` } // InitDefaults sets default values for HTTP/2 configuration. diff --git a/plugins/kv/memcached/storage_test.go b/plugins/kv/memcached/plugin_unit_test.go index 3d37748b..3d37748b 100644 --- a/plugins/kv/memcached/storage_test.go +++ b/plugins/kv/memcached/plugin_unit_test.go diff --git a/plugins/kv/memory/storage_test.go b/plugins/kv/memory/plugin_unit_test.go index d3b24860..d3b24860 100644 --- a/plugins/kv/memory/storage_test.go +++ b/plugins/kv/memory/plugin_unit_test.go diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go index d7531435..88ad7f0e 100644 --- a/plugins/rpc/config.go +++ b/plugins/rpc/config.go @@ -12,9 +12,6 @@ import ( type Config struct { // Listen string Listen string - - // Disabled disables RPC service. - Disabled bool } // InitDefaults allows to init blank config with pre-defined set of default values. diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index 6a83326b..c5813e7b 100644 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -22,17 +22,18 @@ type pluggable struct { // Plugin is RPC service. type Plugin struct { - cfg Config - log logger.Logger - rpc *rpc.Server - services []pluggable + cfg Config + log logger.Logger + rpc *rpc.Server + // set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC + plugins []pluggable listener net.Listener closed *uint32 } // Init rpc service. Must return true if service is enabled. func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("RPC plugin init") + const op = errors.Op("rpc plugin init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } @@ -43,10 +44,6 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { } s.cfg.InitDefaults() - if s.cfg.Disabled { - return errors.E(op, errors.Disabled) - } - s.log = log state := uint32(0) s.closed = &state @@ -57,22 +54,22 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { // Serve serves the service. func (s *Plugin) Serve() chan error { - const op = errors.Op("register service") + const op = errors.Op("serve rpc plugin") errCh := make(chan error, 1) s.rpc = rpc.NewServer() - services := make([]string, 0, len(s.services)) + services := make([]string, 0, len(s.plugins)) // Attach all services - for i := 0; i < len(s.services); i++ { - err := s.Register(s.services[i].name, s.services[i].service.RPC()) + for i := 0; i < len(s.plugins); i++ { + err := s.Register(s.plugins[i].name, s.plugins[i].service.RPC()) if err != nil { errCh <- errors.E(op, err) return errCh } - services = append(services, s.services[i].name) + services = append(services, s.plugins[i].name) } var err error @@ -131,7 +128,7 @@ func (s *Plugin) Collects() []interface{} { // RegisterPlugin registers RPC service plugin. func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) { - s.services = append(s.services, pluggable{ + s.plugins = append(s.plugins, pluggable{ service: p, name: name.Name(), }) diff --git a/plugins/server/config.go b/plugins/server/config.go index 2bf30e70..a990efd3 100644 --- a/plugins/server/config.go +++ b/plugins/server/config.go @@ -4,37 +4,144 @@ import ( "time" ) -// Config config combines factory, pool and cmd configurations. +// All config (.rr.yaml) +// For other section use pointer to distinguish between `empty` and `not present` type Config struct { - // Command to run as application. - Command string + // Server config section + Server struct { + // Command to run as application. + Command string `yaml:"command"` + // User to run application under. + User string `yaml:"user"` + // Group to run application under. + Group string `yaml:"group"` + // Env represents application environment. + Env Env `yaml:"env"` + // Relay defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string `yaml:"relay"` + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration `yaml:"relayTimeout"` + } `yaml:"server"` - // User to run application under. - User string - - // Group to run application under. - Group string - - // Env represents application environment. - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. Defaults to 60s. - RelayTimeout time.Duration + RPC *struct { + Listen string `yaml:"listen"` + } `yaml:"rpc"` + Logs *struct { + Mode string `yaml:"mode"` + Level string `yaml:"level"` + } `yaml:"logs"` + HTTP *struct { + Address string `yaml:"address"` + MaxRequestSize int `yaml:"max_request_size"` + Middleware []string `yaml:"middleware"` + Uploads struct { + Forbid []string `yaml:"forbid"` + } `yaml:"uploads"` + TrustedSubnets []string `yaml:"trusted_subnets"` + Pool struct { + NumWorkers int `yaml:"num_workers"` + MaxJobs int `yaml:"max_jobs"` + AllocateTimeout string `yaml:"allocate_timeout"` + DestroyTimeout string `yaml:"destroy_timeout"` + Supervisor struct { + WatchTick int `yaml:"watch_tick"` + TTL int `yaml:"ttl"` + IdleTTL int `yaml:"idle_ttl"` + ExecTTL int `yaml:"exec_ttl"` + MaxWorkerMemory int `yaml:"max_worker_memory"` + } `yaml:"supervisor"` + } `yaml:"pool"` + Ssl struct { + Port int `yaml:"port"` + Redirect bool `yaml:"redirect"` + Cert string `yaml:"cert"` + Key string `yaml:"key"` + } `yaml:"ssl"` + Fcgi struct { + Address string `yaml:"address"` + } `yaml:"fcgi"` + HTTP2 struct { + Enabled bool `yaml:"enabled"` + H2C bool `yaml:"h2c"` + MaxConcurrentStreams int `yaml:"max_concurrent_streams"` + } `yaml:"http2"` + } `yaml:"http"` + Redis *struct { + Addrs []string `yaml:"addrs"` + MasterName string `yaml:"master_name"` + Username string `yaml:"username"` + Password string `yaml:"password"` + DB int `yaml:"db"` + SentinelPassword string `yaml:"sentinel_password"` + RouteByLatency bool `yaml:"route_by_latency"` + RouteRandomly bool `yaml:"route_randomly"` + DialTimeout int `yaml:"dial_timeout"` + MaxRetries int `yaml:"max_retries"` + MinRetryBackoff int `yaml:"min_retry_backoff"` + MaxRetryBackoff int `yaml:"max_retry_backoff"` + PoolSize int `yaml:"pool_size"` + MinIdleConns int `yaml:"min_idle_conns"` + MaxConnAge int `yaml:"max_conn_age"` + ReadTimeout int `yaml:"read_timeout"` + WriteTimeout int `yaml:"write_timeout"` + PoolTimeout int `yaml:"pool_timeout"` + IdleTimeout int `yaml:"idle_timeout"` + IdleCheckFreq int `yaml:"idle_check_freq"` + ReadOnly bool `yaml:"read_only"` + } `yaml:"redis"` + Boltdb *struct { + Dir string `yaml:"dir"` + File string `yaml:"file"` + Bucket string `yaml:"bucket"` + Permissions int `yaml:"permissions"` + TTL int `yaml:"TTL"` + } `yaml:"boltdb"` + Memcached *struct { + Addr []string `yaml:"addr"` + } `yaml:"memcached"` + Memory *struct { + Enabled bool `yaml:"enabled"` + Interval int `yaml:"interval"` + } `yaml:"memory"` + Metrics *struct { + Address string `yaml:"address"` + Collect struct { + AppMetric struct { + Type string `yaml:"type"` + Help string `yaml:"help"` + Labels []string `yaml:"labels"` + Buckets []float64 `yaml:"buckets"` + Objectives []struct { + Num2 float64 `yaml:"2,omitempty"` + One4 float64 `yaml:"1.4,omitempty"` + } `yaml:"objectives"` + } `yaml:"app_metric"` + } `yaml:"collect"` + } `yaml:"metrics"` + Reload *struct { + Interval string `yaml:"interval"` + Patterns []string `yaml:"patterns"` + Services struct { + HTTP struct { + Recursive bool `yaml:"recursive"` + Ignore []string `yaml:"ignore"` + Patterns []string `yaml:"patterns"` + Dirs []string `yaml:"dirs"` + } `yaml:"http"` + } `yaml:"services"` + } `yaml:"reload"` } // InitDefaults for the server config func (cfg *Config) InitDefaults() { - if cfg.Relay == "" { - cfg.Relay = "pipes" + if cfg.Server.Relay == "" { + cfg.Server.Relay = "pipes" } - if cfg.RelayTimeout == 0 { - cfg.RelayTimeout = time.Second * 60 + if cfg.Server.RelayTimeout == 0 { + cfg.Server.RelayTimeout = time.Second * 60 } } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 8d8a7694..565c80c4 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -24,6 +24,13 @@ import ( // PluginName for the server const PluginName = "server" +// RR_RELAY env variable key (internal) +const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck +// RR_RPC env variable key (internal) if the RPC presents +const RR_RPC = "" //nolint:golint,stylecheck +// RR_HTTP env variable key (internal) if the HTTP presents +const RR_HTTP = "false" //nolint:golint,stylecheck + // Plugin manages worker type Plugin struct { cfg Config @@ -34,7 +41,7 @@ type Plugin struct { // Init application provider. func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("Init") - err := cfg.UnmarshalKey(PluginName, &server.cfg) + err := cfg.Unmarshal(&server.cfg) if err != nil { return errors.E(op, errors.Init, err) } @@ -43,7 +50,7 @@ func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { server.factory, err = server.initFactory() if err != nil { - return errors.E(errors.Op("Init factory"), err) + return errors.E(err) } return nil @@ -75,7 +82,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { var cmdArgs []string // create command according to the config - cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...) + cmdArgs = append(cmdArgs, strings.Split(server.cfg.Server.Command, " ")...) if len(cmdArgs) < 2 { return nil, errors.E(op, errors.Str("should be in form of `php <script>")) } @@ -93,8 +100,8 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { // if user is not empty, and OS is linux or macos // execute php worker from that particular user - if server.cfg.User != "" { - err := utils.ExecuteFromUser(cmd, server.cfg.User) + if server.cfg.Server.User != "" { + err := utils.ExecuteFromUser(cmd, server.cfg.Server.User) if err != nil { return nil } @@ -150,17 +157,17 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, en // creates relay and worker factory. func (server *Plugin) initFactory() (worker.Factory, error) { - const op = errors.Op("network factory init") - if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { + const op = errors.Op("server factory init") + if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" { return pipe.NewPipeFactory(), nil } - dsn := strings.Split(server.cfg.Relay, "://") + dsn := strings.Split(server.cfg.Server.Relay, "://") if len(dsn) != 2 { return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } - lsn, err := utils.CreateListener(server.cfg.Relay) + lsn, err := utils.CreateListener(server.cfg.Server.Relay) if err != nil { return nil, errors.E(op, errors.Network, err) } @@ -168,20 +175,35 @@ func (server *Plugin) initFactory() (worker.Factory, error) { switch dsn[0] { // sockets group case "unix": - return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil case "tcp": - return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil default: return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } func (server *Plugin) setEnv(e Env) []string { - env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay)) + env := append(os.Environ(), fmt.Sprintf(RR_RELAY+"=%s", server.cfg.Server.Relay)) for k, v := range e { env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) } + // set internal env variables + if server.cfg.HTTP != nil { + env = append(env, fmt.Sprintf("%s=%s", RR_HTTP, "true")) + } + if server.cfg.RPC != nil && server.cfg.RPC.Listen != "" { + env = append(env, fmt.Sprintf("%s=%s", RR_RPC, server.cfg.RPC.Listen)) + } + + // set env variables from the config + if len(server.cfg.Server.Env) > 0 { + for k, v := range server.cfg.Server.Env { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + } + return env } @@ -195,7 +217,7 @@ func (server *Plugin) collectPoolLogs(event interface{}) { case events.EventPoolError: server.log.Info("pool error", "error", we.Payload.(error).Error()) case events.EventSupervisorError: - server.log.Info("pool supervizor error", "error", we.Payload.(error).Error()) + server.log.Info("pool supervisor error", "error", we.Payload.(error).Error()) case events.EventTTL: server.log.Info("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid()) case events.EventWorkerConstruct: diff --git a/tests/plugins/config/.rr.yaml b/tests/plugins/config/.rr.yaml index 732a1366..bad2846a 100755 --- a/tests/plugins/config/.rr.yaml +++ b/tests/plugins/config/.rr.yaml @@ -1,3 +1,6 @@ +rpc: + listen: tcp://localhost:6060 + reload: enabled: true interval: 1s diff --git a/tests/plugins/config/config_test.go b/tests/plugins/config/config_test.go index 858fcb80..6d95ba70 100755 --- a/tests/plugins/config/config_test.go +++ b/tests/plugins/config/config_test.go @@ -44,6 +44,7 @@ func TestViperProvider_Init(t *testing.T) { signal.Notify(c, os.Interrupt) tt := time.NewTicker(time.Second * 2) + defer tt.Stop() for { select { @@ -53,12 +54,9 @@ func TestViperProvider_Init(t *testing.T) { return case <-c: er := container.Stop() - if er != nil { - panic(er) - } + assert.NoError(t, er) return case <-tt.C: - tt.Stop() assert.NoError(t, container.Stop()) return } diff --git a/tests/plugins/config/plugin1.go b/tests/plugins/config/plugin1.go index 2afe79a4..a6c06aec 100755 --- a/tests/plugins/config/plugin1.go +++ b/tests/plugins/config/plugin1.go @@ -1,12 +1,41 @@ package config import ( - "errors" "time" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" ) +type AllConfig struct { + RPC struct { + Listen string `yaml:"listen"` + } `yaml:"rpc"` + Reload struct { + Enabled bool `yaml:"enabled"` + Interval string `yaml:"interval"` + Patterns []string `yaml:"patterns"` + Services struct { + HTTP struct { + Recursive bool `yaml:"recursive"` + Ignore []string `yaml:"ignore"` + Patterns []string `yaml:"patterns"` + Dirs []string `yaml:"dirs"` + } `yaml:"http"` + Jobs struct { + Recursive bool `yaml:"recursive"` + Ignore []string `yaml:"ignore"` + Dirs []string `yaml:"dirs"` + } `yaml:"jobs"` + RPC struct { + Recursive bool `yaml:"recursive"` + Patterns []string `yaml:"patterns"` + Dirs []string `yaml:"dirs"` + } `yaml:"rpc"` + } `yaml:"services"` + } `yaml:"reload"` +} + // ReloadConfig is a Reload configuration point. type ReloadConfig struct { Interval time.Duration @@ -33,6 +62,7 @@ func (f *Foo) Init(p config.Configurer) error { } func (f *Foo) Serve() chan error { + const op = errors.Op("foo serve") errCh := make(chan error, 1) r := &ReloadConfig{} @@ -42,7 +72,20 @@ func (f *Foo) Serve() chan error { } if len(r.Patterns) == 0 { - errCh <- errors.New("should be at least one pattern, but got 0") + errCh <- errors.E(op, errors.Str("should be at least one pattern, but got 0")) + return errCh + } + + var allCfg AllConfig + err = f.configProvider.Unmarshal(&allCfg) + if err != nil { + errCh <- errors.E(op, errors.Str("should be at least one pattern, but got 0")) + return errCh + } + + if allCfg.RPC.Listen != "tcp://localhost:6060" { + errCh <- errors.E(op, errors.Str("RPC.Listen should be parsed")) + return errCh } return errCh diff --git a/tests/plugins/kv/boltdb/plugin_test.go b/tests/plugins/kv/boltdb/plugin_test.go index ba9b695a..5548402d 100644 --- a/tests/plugins/kv/boltdb/plugin_test.go +++ b/tests/plugins/kv/boltdb/plugin_test.go @@ -172,7 +172,7 @@ func testRPCMethods(t *testing.T) { assert.Len(t, ttlRes, 3) // HAS AFTER TTL - time.Sleep(time.Second * 11) + time.Sleep(time.Second * 15) ret = make(map[string]bool) keys = []string{"a", "b", "d"} err = client.Call("boltdb.Has", keys, &ret) diff --git a/tests/plugins/kv/memcached/plugin_test.go b/tests/plugins/kv/memcached/plugin_test.go index 6eff8715..d4cb58bb 100644 --- a/tests/plugins/kv/memcached/plugin_test.go +++ b/tests/plugins/kv/memcached/plugin_test.go @@ -172,7 +172,7 @@ func testRPCMethods(t *testing.T) { assert.Len(t, ttlRes, 0) // HAS AFTER TTL - time.Sleep(time.Second * 11) + time.Sleep(time.Second * 15) ret = make(map[string]bool) keys = []string{"a", "b", "d"} err = client.Call("memcached.Has", keys, &ret) diff --git a/tests/plugins/kv/memory/plugin_test.go b/tests/plugins/kv/memory/plugin_test.go index c6f94602..ee01fabb 100644 --- a/tests/plugins/kv/memory/plugin_test.go +++ b/tests/plugins/kv/memory/plugin_test.go @@ -172,7 +172,7 @@ func testRPCMethods(t *testing.T) { assert.Len(t, ttlRes, 3) // HAS AFTER TTL - time.Sleep(time.Second * 11) + time.Sleep(time.Second * 15) ret = make(map[string]bool) keys = []string{"a", "b", "d"} err = client.Call("memory.Has", keys, &ret) diff --git a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml index d5c185e7..5ab359d3 100644 --- a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml +++ b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml @@ -1,6 +1,3 @@ -rpc: - listen: tcp://127.0.0.1:6001 - disabled: true logs: mode: development level: error
\ No newline at end of file diff --git a/tests/plugins/rpc/configs/.rr.yaml b/tests/plugins/rpc/configs/.rr.yaml index d2cb6c70..67d935e3 100644 --- a/tests/plugins/rpc/configs/.rr.yaml +++ b/tests/plugins/rpc/configs/.rr.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false logs: mode: development level: error
\ No newline at end of file |