diff options
Diffstat (limited to 'plugins/service')
-rw-r--r-- | plugins/service/config.go | 34 | ||||
-rw-r--r-- | plugins/service/plugin.go | 110 | ||||
-rw-r--r-- | plugins/service/process.go | 147 |
3 files changed, 0 insertions, 291 deletions
diff --git a/plugins/service/config.go b/plugins/service/config.go deleted file mode 100644 index 871c8f76..00000000 --- a/plugins/service/config.go +++ /dev/null @@ -1,34 +0,0 @@ -package service - -import "time" - -// Service represents particular service configuration -type Service struct { - Command string `mapstructure:"command"` - ProcessNum int `mapstructure:"process_num"` - ExecTimeout time.Duration `mapstructure:"exec_timeout"` - RemainAfterExit bool `mapstructure:"remain_after_exit"` - RestartSec uint64 `mapstructure:"restart_sec"` -} - -// Config for the services -type Config struct { - Services map[string]Service `mapstructure:"service"` -} - -func (c *Config) InitDefault() { - if len(c.Services) > 0 { - for k, v := range c.Services { - if v.ProcessNum == 0 { - val := c.Services[k] - val.ProcessNum = 1 - c.Services[k] = val - } - if v.RestartSec == 0 { - val := c.Services[k] - val.RestartSec = 30 - c.Services[k] = val - } - } - } -} diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go deleted file mode 100644 index 3bd0f956..00000000 --- a/plugins/service/plugin.go +++ /dev/null @@ -1,110 +0,0 @@ -package service - -import ( - "sync" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/state/process" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const PluginName string = "service" - -type Plugin struct { - sync.Mutex - - logger logger.Logger - cfg Config - - // all processes attached to the service - processes []*Process -} - -func (service *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("service_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(errors.Disabled) - } - err := cfg.UnmarshalKey(PluginName, &service.cfg.Services) - if err != nil { - return errors.E(op, err) - } - - // init default parameters if not set by user - service.cfg.InitDefault() - // save the logger - service.logger = log - - return nil -} - -func (service *Plugin) Serve() chan error { - errCh := make(chan error, 1) - - // start processing - go func() { - // lock here, because Stop command might be invoked during the Serve - service.Lock() - defer service.Unlock() - - service.processes = make([]*Process, 0, len(service.cfg.Services)) - // for the every service - for k := range service.cfg.Services { - // create needed number of the processes - for i := 0; i < service.cfg.Services[k].ProcessNum; i++ { - // create processor structure, which will process all the services - service.processes = append(service.processes, NewServiceProcess( - service.cfg.Services[k].RemainAfterExit, - service.cfg.Services[k].ExecTimeout, - service.cfg.Services[k].RestartSec, - service.cfg.Services[k].Command, - service.logger, - errCh, - )) - } - } - - // start all processes - for i := 0; i < len(service.processes); i++ { - service.processes[i].start() - } - }() - - return errCh -} - -func (service *Plugin) Workers() []process.State { - service.Lock() - defer service.Unlock() - states := make([]process.State, 0, len(service.processes)) - for i := 0; i < len(service.processes); i++ { - st, err := process.GeneralProcessState(service.processes[i].Pid, service.processes[i].rawCmd) - if err != nil { - continue - } - states = append(states, st) - } - return states -} - -func (service *Plugin) Stop() error { - service.Lock() - defer service.Unlock() - - if len(service.processes) > 0 { - for i := 0; i < len(service.processes); i++ { - service.processes[i].stop() - } - } - return nil -} - -// Name contains service name. -func (service *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (service *Plugin) Available() { -} diff --git a/plugins/service/process.go b/plugins/service/process.go deleted file mode 100644 index cac5c41e..00000000 --- a/plugins/service/process.go +++ /dev/null @@ -1,147 +0,0 @@ -package service - -import ( - "os/exec" - "strings" - "sync" - "sync/atomic" - "syscall" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -// Process structure contains an information about process, restart information, log, errors, etc -type Process struct { - sync.Mutex - // command to execute - command *exec.Cmd - // rawCmd from the plugin - rawCmd string - Pid int - - // root plugin error chan - errCh chan error - // logger - log logger.Logger - - ExecTimeout time.Duration - RemainAfterExit bool - RestartSec uint64 - - // process start time - startTime time.Time - stopped uint64 -} - -// NewServiceProcess constructs service process structure -func NewServiceProcess(restartAfterExit bool, execTimeout time.Duration, restartDelay uint64, command string, l logger.Logger, errCh chan error) *Process { - return &Process{ - rawCmd: command, - RestartSec: restartDelay, - ExecTimeout: execTimeout, - RemainAfterExit: restartAfterExit, - errCh: errCh, - log: l, - } -} - -// write message to the log (stderr) -func (p *Process) Write(b []byte) (int, error) { - p.log.Info(utils.AsString(b)) - return len(b), nil -} - -func (p *Process) start() { - p.Lock() - defer p.Unlock() - const op = errors.Op("processor_start") - - // crate fat-process here - p.createProcess() - - // non blocking process start - err := p.command.Start() - if err != nil { - p.errCh <- errors.E(op, err) - return - } - - // start process waiting routine - go p.wait() - // execHandler checks for the execTimeout - go p.execHandler() - // save start time - p.startTime = time.Now() - p.Pid = p.command.Process.Pid -} - -// create command for the process -func (p *Process) createProcess() { - // cmdArgs contain command arguments if the command in form of: php <command> or ls <command> -i -b - var cmdArgs []string - cmdArgs = append(cmdArgs, strings.Split(p.rawCmd, " ")...) - if len(cmdArgs) < 2 { - p.command = exec.Command(p.rawCmd) //nolint:gosec - } else { - p.command = exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec - } - // redirect stderr into the Write function of the process.go - p.command.Stderr = p -} - -// wait process for exit -func (p *Process) wait() { - // Wait error doesn't matter here - err := p.command.Wait() - if err != nil { - p.log.Error("process wait error", "error", err) - } - // wait for restart delay - if p.RemainAfterExit { - // wait for the delay - time.Sleep(time.Second * time.Duration(p.RestartSec)) - // and start command again - p.start() - } -} - -// stop can be only sent by the Endure when plugin stopped -func (p *Process) stop() { - atomic.StoreUint64(&p.stopped, 1) -} - -func (p *Process) execHandler() { - tt := time.NewTicker(time.Second) - for range tt.C { - // lock here, because p.startTime could be changed during the check - p.Lock() - // if the exec timeout is set - if p.ExecTimeout != 0 { - // if stopped -> kill the process (SIGINT-> SIGKILL) and exit - if atomic.CompareAndSwapUint64(&p.stopped, 1, 1) { - err := p.command.Process.Signal(syscall.SIGINT) - if err != nil { - _ = p.command.Process.Signal(syscall.SIGKILL) - } - tt.Stop() - p.Unlock() - return - } - - // check the running time for the script - if time.Now().After(p.startTime.Add(p.ExecTimeout)) { - err := p.command.Process.Signal(syscall.SIGINT) - if err != nil { - _ = p.command.Process.Signal(syscall.SIGKILL) - } - p.Unlock() - tt.Stop() - return - } - } - p.Unlock() - } -} |