summaryrefslogtreecommitdiff
path: root/plugins/service
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/service')
-rw-r--r--plugins/service/plugin.go6
-rw-r--r--plugins/service/process.go39
2 files changed, 31 insertions, 14 deletions
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go
index bc72dbaf..60ed46c3 100644
--- a/plugins/service/plugin.go
+++ b/plugins/service/plugin.go
@@ -43,6 +43,10 @@ func (service *Plugin) Serve() chan error {
// start processing
go func() {
+ // lock here, because Stop command might be invoked during the Serve
+ service.Lock()
+ defer service.Unlock()
+
service.processes = make([]*Process, 0, len(service.cfg.Services))
// for the every service
for k := range service.cfg.Services {
@@ -60,11 +64,9 @@ func (service *Plugin) Serve() chan error {
}
}
- service.Lock()
for i := 0; i < len(service.processes); i++ {
service.processes[i].start()
}
- service.Unlock()
}()
return errCh
diff --git a/plugins/service/process.go b/plugins/service/process.go
index 5a5eb32b..040f88e0 100644
--- a/plugins/service/process.go
+++ b/plugins/service/process.go
@@ -4,6 +4,7 @@ import (
"os/exec"
"strings"
"sync"
+ "sync/atomic"
"syscall"
"time"
"unsafe"
@@ -17,18 +18,21 @@ type Process struct {
sync.Mutex
// command to execute
command *exec.Cmd
+ // rawCmd from the plugin
rawCmd string
+ // root plugin error chan
errCh chan error
+ // logger
log logger.Logger
ExecTimeout time.Duration
RestartAfterExit bool
RestartDelay time.Duration
- //
+ // process start time
startTime time.Time
- stopCh chan struct{}
+ stopped uint64
}
// NewServiceProcess constructs service process structure
@@ -39,7 +43,6 @@ func NewServiceProcess(restartAfterExit bool, execTimeout, restartDelay time.Dur
ExecTimeout: execTimeout,
RestartAfterExit: restartAfterExit,
errCh: errCh,
- stopCh: make(chan struct{}),
log: l,
}
}
@@ -53,24 +56,27 @@ func (p *Process) Write(b []byte) (int, error) {
func (p *Process) start() {
p.Lock()
defer p.Unlock()
-
const op = errors.Op("processor_start")
// crate fat-process here
p.createProcess()
+ // non blocking process start
err := p.command.Start()
if err != nil {
p.errCh <- errors.E(op, err)
return
}
+ // start process waiting routine
go p.wait()
+ // startExec
go p.execHandler()
// save start time
p.startTime = time.Now()
}
+// create command for the process
func (p *Process) createProcess() {
// cmdArgs contain command arguments if the command in form of: php <command> or ls <command> -i -b
var cmdArgs []string
@@ -80,9 +86,11 @@ func (p *Process) createProcess() {
} else {
p.command = exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec
}
+ // redirect stderr into the Write function of the process.go
p.command.Stderr = p
}
+// wait process for exit
func (p *Process) wait() {
// Wait error doesn't matter here
_ = p.command.Wait()
@@ -98,7 +106,7 @@ func (p *Process) wait() {
// stop can be only sent by the Endure when plugin stopped
func (p *Process) stop() {
- p.stopCh <- struct{}{}
+ atomic.StoreUint64(&p.stopped, 1)
}
func (p *Process) execHandler() {
@@ -110,22 +118,29 @@ func (p *Process) execHandler() {
p.Lock()
// if the exec timeout is set
if p.ExecTimeout != 0 {
+ // if stopped -> kill the process (SIGINT-> SIGKILL) and exit
+ if atomic.CompareAndSwapUint64(&p.stopped, 1, 1) {
+ err := p.command.Process.Signal(syscall.SIGINT)
+ if err != nil {
+ _ = p.command.Process.Signal(syscall.SIGKILL)
+ }
+ tt.Stop()
+ p.Unlock()
+ return
+ }
+
// check the running time for the script
if time.Now().After(p.startTime.Add(p.ExecTimeout)) {
err := p.command.Process.Signal(syscall.SIGINT)
if err != nil {
_ = p.command.Process.Signal(syscall.SIGKILL)
}
+ p.Unlock()
+ tt.Stop()
+ return
}
}
p.Unlock()
- case <-p.stopCh:
- err := p.command.Process.Signal(syscall.SIGINT)
- if err != nil {
- _ = p.command.Process.Signal(syscall.SIGKILL)
- }
- tt.Stop()
- return
}
}
}