diff options
Diffstat (limited to 'plugins/service')
-rw-r--r-- | plugins/service/plugin.go | 6 | ||||
-rw-r--r-- | plugins/service/process.go | 39 |
2 files changed, 31 insertions, 14 deletions
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go index bc72dbaf..60ed46c3 100644 --- a/plugins/service/plugin.go +++ b/plugins/service/plugin.go @@ -43,6 +43,10 @@ func (service *Plugin) Serve() chan error { // start processing go func() { + // lock here, because Stop command might be invoked during the Serve + service.Lock() + defer service.Unlock() + service.processes = make([]*Process, 0, len(service.cfg.Services)) // for the every service for k := range service.cfg.Services { @@ -60,11 +64,9 @@ func (service *Plugin) Serve() chan error { } } - service.Lock() for i := 0; i < len(service.processes); i++ { service.processes[i].start() } - service.Unlock() }() return errCh diff --git a/plugins/service/process.go b/plugins/service/process.go index 5a5eb32b..040f88e0 100644 --- a/plugins/service/process.go +++ b/plugins/service/process.go @@ -4,6 +4,7 @@ import ( "os/exec" "strings" "sync" + "sync/atomic" "syscall" "time" "unsafe" @@ -17,18 +18,21 @@ type Process struct { sync.Mutex // command to execute command *exec.Cmd + // rawCmd from the plugin rawCmd string + // root plugin error chan errCh chan error + // logger log logger.Logger ExecTimeout time.Duration RestartAfterExit bool RestartDelay time.Duration - // + // process start time startTime time.Time - stopCh chan struct{} + stopped uint64 } // NewServiceProcess constructs service process structure @@ -39,7 +43,6 @@ func NewServiceProcess(restartAfterExit bool, execTimeout, restartDelay time.Dur ExecTimeout: execTimeout, RestartAfterExit: restartAfterExit, errCh: errCh, - stopCh: make(chan struct{}), log: l, } } @@ -53,24 +56,27 @@ func (p *Process) Write(b []byte) (int, error) { func (p *Process) start() { p.Lock() defer p.Unlock() - const op = errors.Op("processor_start") // crate fat-process here p.createProcess() + // non blocking process start err := p.command.Start() if err != nil { p.errCh <- errors.E(op, err) return } + // start process waiting routine go p.wait() + // startExec go p.execHandler() // save start time p.startTime = time.Now() } +// create command for the process func (p *Process) createProcess() { // cmdArgs contain command arguments if the command in form of: php <command> or ls <command> -i -b var cmdArgs []string @@ -80,9 +86,11 @@ func (p *Process) createProcess() { } else { p.command = exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec } + // redirect stderr into the Write function of the process.go p.command.Stderr = p } +// wait process for exit func (p *Process) wait() { // Wait error doesn't matter here _ = p.command.Wait() @@ -98,7 +106,7 @@ func (p *Process) wait() { // stop can be only sent by the Endure when plugin stopped func (p *Process) stop() { - p.stopCh <- struct{}{} + atomic.StoreUint64(&p.stopped, 1) } func (p *Process) execHandler() { @@ -110,22 +118,29 @@ func (p *Process) execHandler() { p.Lock() // if the exec timeout is set if p.ExecTimeout != 0 { + // if stopped -> kill the process (SIGINT-> SIGKILL) and exit + if atomic.CompareAndSwapUint64(&p.stopped, 1, 1) { + err := p.command.Process.Signal(syscall.SIGINT) + if err != nil { + _ = p.command.Process.Signal(syscall.SIGKILL) + } + tt.Stop() + p.Unlock() + return + } + // check the running time for the script if time.Now().After(p.startTime.Add(p.ExecTimeout)) { err := p.command.Process.Signal(syscall.SIGINT) if err != nil { _ = p.command.Process.Signal(syscall.SIGKILL) } + p.Unlock() + tt.Stop() + return } } p.Unlock() - case <-p.stopCh: - err := p.command.Process.Signal(syscall.SIGINT) - if err != nil { - _ = p.command.Process.Signal(syscall.SIGKILL) - } - tt.Stop() - return } } } |