summaryrefslogtreecommitdiff
path: root/plugins/service
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/service')
-rw-r--r--plugins/service/config.go34
-rw-r--r--plugins/service/plugin.go110
-rw-r--r--plugins/service/process.go147
3 files changed, 0 insertions, 291 deletions
diff --git a/plugins/service/config.go b/plugins/service/config.go
deleted file mode 100644
index 871c8f76..00000000
--- a/plugins/service/config.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package service
-
-import "time"
-
-// Service represents particular service configuration
-type Service struct {
- Command string `mapstructure:"command"`
- ProcessNum int `mapstructure:"process_num"`
- ExecTimeout time.Duration `mapstructure:"exec_timeout"`
- RemainAfterExit bool `mapstructure:"remain_after_exit"`
- RestartSec uint64 `mapstructure:"restart_sec"`
-}
-
-// Config for the services
-type Config struct {
- Services map[string]Service `mapstructure:"service"`
-}
-
-func (c *Config) InitDefault() {
- if len(c.Services) > 0 {
- for k, v := range c.Services {
- if v.ProcessNum == 0 {
- val := c.Services[k]
- val.ProcessNum = 1
- c.Services[k] = val
- }
- if v.RestartSec == 0 {
- val := c.Services[k]
- val.RestartSec = 30
- c.Services[k] = val
- }
- }
- }
-}
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go
deleted file mode 100644
index 3bd0f956..00000000
--- a/plugins/service/plugin.go
+++ /dev/null
@@ -1,110 +0,0 @@
-package service
-
-import (
- "sync"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/state/process"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const PluginName string = "service"
-
-type Plugin struct {
- sync.Mutex
-
- logger logger.Logger
- cfg Config
-
- // all processes attached to the service
- processes []*Process
-}
-
-func (service *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("service_plugin_init")
- if !cfg.Has(PluginName) {
- return errors.E(errors.Disabled)
- }
- err := cfg.UnmarshalKey(PluginName, &service.cfg.Services)
- if err != nil {
- return errors.E(op, err)
- }
-
- // init default parameters if not set by user
- service.cfg.InitDefault()
- // save the logger
- service.logger = log
-
- return nil
-}
-
-func (service *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
-
- // start processing
- go func() {
- // lock here, because Stop command might be invoked during the Serve
- service.Lock()
- defer service.Unlock()
-
- service.processes = make([]*Process, 0, len(service.cfg.Services))
- // for the every service
- for k := range service.cfg.Services {
- // create needed number of the processes
- for i := 0; i < service.cfg.Services[k].ProcessNum; i++ {
- // create processor structure, which will process all the services
- service.processes = append(service.processes, NewServiceProcess(
- service.cfg.Services[k].RemainAfterExit,
- service.cfg.Services[k].ExecTimeout,
- service.cfg.Services[k].RestartSec,
- service.cfg.Services[k].Command,
- service.logger,
- errCh,
- ))
- }
- }
-
- // start all processes
- for i := 0; i < len(service.processes); i++ {
- service.processes[i].start()
- }
- }()
-
- return errCh
-}
-
-func (service *Plugin) Workers() []process.State {
- service.Lock()
- defer service.Unlock()
- states := make([]process.State, 0, len(service.processes))
- for i := 0; i < len(service.processes); i++ {
- st, err := process.GeneralProcessState(service.processes[i].Pid, service.processes[i].rawCmd)
- if err != nil {
- continue
- }
- states = append(states, st)
- }
- return states
-}
-
-func (service *Plugin) Stop() error {
- service.Lock()
- defer service.Unlock()
-
- if len(service.processes) > 0 {
- for i := 0; i < len(service.processes); i++ {
- service.processes[i].stop()
- }
- }
- return nil
-}
-
-// Name contains service name.
-func (service *Plugin) Name() string {
- return PluginName
-}
-
-// Available interface implementation
-func (service *Plugin) Available() {
-}
diff --git a/plugins/service/process.go b/plugins/service/process.go
deleted file mode 100644
index cac5c41e..00000000
--- a/plugins/service/process.go
+++ /dev/null
@@ -1,147 +0,0 @@
-package service
-
-import (
- "os/exec"
- "strings"
- "sync"
- "sync/atomic"
- "syscall"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-// Process structure contains an information about process, restart information, log, errors, etc
-type Process struct {
- sync.Mutex
- // command to execute
- command *exec.Cmd
- // rawCmd from the plugin
- rawCmd string
- Pid int
-
- // root plugin error chan
- errCh chan error
- // logger
- log logger.Logger
-
- ExecTimeout time.Duration
- RemainAfterExit bool
- RestartSec uint64
-
- // process start time
- startTime time.Time
- stopped uint64
-}
-
-// NewServiceProcess constructs service process structure
-func NewServiceProcess(restartAfterExit bool, execTimeout time.Duration, restartDelay uint64, command string, l logger.Logger, errCh chan error) *Process {
- return &Process{
- rawCmd: command,
- RestartSec: restartDelay,
- ExecTimeout: execTimeout,
- RemainAfterExit: restartAfterExit,
- errCh: errCh,
- log: l,
- }
-}
-
-// write message to the log (stderr)
-func (p *Process) Write(b []byte) (int, error) {
- p.log.Info(utils.AsString(b))
- return len(b), nil
-}
-
-func (p *Process) start() {
- p.Lock()
- defer p.Unlock()
- const op = errors.Op("processor_start")
-
- // crate fat-process here
- p.createProcess()
-
- // non blocking process start
- err := p.command.Start()
- if err != nil {
- p.errCh <- errors.E(op, err)
- return
- }
-
- // start process waiting routine
- go p.wait()
- // execHandler checks for the execTimeout
- go p.execHandler()
- // save start time
- p.startTime = time.Now()
- p.Pid = p.command.Process.Pid
-}
-
-// create command for the process
-func (p *Process) createProcess() {
- // cmdArgs contain command arguments if the command in form of: php <command> or ls <command> -i -b
- var cmdArgs []string
- cmdArgs = append(cmdArgs, strings.Split(p.rawCmd, " ")...)
- if len(cmdArgs) < 2 {
- p.command = exec.Command(p.rawCmd) //nolint:gosec
- } else {
- p.command = exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec
- }
- // redirect stderr into the Write function of the process.go
- p.command.Stderr = p
-}
-
-// wait process for exit
-func (p *Process) wait() {
- // Wait error doesn't matter here
- err := p.command.Wait()
- if err != nil {
- p.log.Error("process wait error", "error", err)
- }
- // wait for restart delay
- if p.RemainAfterExit {
- // wait for the delay
- time.Sleep(time.Second * time.Duration(p.RestartSec))
- // and start command again
- p.start()
- }
-}
-
-// stop can be only sent by the Endure when plugin stopped
-func (p *Process) stop() {
- atomic.StoreUint64(&p.stopped, 1)
-}
-
-func (p *Process) execHandler() {
- tt := time.NewTicker(time.Second)
- for range tt.C {
- // lock here, because p.startTime could be changed during the check
- p.Lock()
- // if the exec timeout is set
- if p.ExecTimeout != 0 {
- // if stopped -> kill the process (SIGINT-> SIGKILL) and exit
- if atomic.CompareAndSwapUint64(&p.stopped, 1, 1) {
- err := p.command.Process.Signal(syscall.SIGINT)
- if err != nil {
- _ = p.command.Process.Signal(syscall.SIGKILL)
- }
- tt.Stop()
- p.Unlock()
- return
- }
-
- // check the running time for the script
- if time.Now().After(p.startTime.Add(p.ExecTimeout)) {
- err := p.command.Process.Signal(syscall.SIGINT)
- if err != nil {
- _ = p.command.Process.Signal(syscall.SIGKILL)
- }
- p.Unlock()
- tt.Stop()
- return
- }
- }
- p.Unlock()
- }
-}