summaryrefslogtreecommitdiff
path: root/worker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-12-15 00:12:23 +0300
committerValery Piashchynski <[email protected]>2021-12-15 00:12:23 +0300
commitb4e4f7e7e60bff48a63df4a3c606398ea2a32d8a (patch)
treed018a39795b94f61e1dadde54ce3382fc7e039b0 /worker
parentf2c79017ae5759256b03ec58b608f298a29e4b96 (diff)
Update static_pool and worker to wait response from the worker
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker')
-rwxr-xr-xworker/sync_worker.go32
-rwxr-xr-xworker/worker.go30
2 files changed, 47 insertions, 15 deletions
diff --git a/worker/sync_worker.go b/worker/sync_worker.go
index 81d8c5bf..e3e85ba6 100755
--- a/worker/sync_worker.go
+++ b/worker/sync_worker.go
@@ -20,6 +20,7 @@ type SyncWorkerImpl struct {
process *Process
fPool sync.Pool
bPool sync.Pool
+ chPool sync.Pool
}
// From creates SyncWorker from BaseProcess
@@ -32,12 +33,17 @@ func From(process *Process) *SyncWorkerImpl {
bPool: sync.Pool{New: func() interface{} {
return new(bytes.Buffer)
}},
+
+ chPool: sync.Pool{New: func() interface{} {
+ return make(chan wexec, 1)
+ }},
}
}
// Exec payload without TTL timeout.
func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
+
if len(p.Body) == 0 && len(p.Context) == 0 {
return nil, errors.E(op, errors.Str("payload can not be empty"))
}
@@ -81,7 +87,13 @@ type wexec struct {
// ExecWithTTL executes payload without TTL timeout.
func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
- c := make(chan wexec, 1)
+
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
+ }
+
+ c := tw.getCh()
+ defer tw.putCh(c)
// worker was killed before it started to work (supervisor)
if tw.process.State().Value() != StateReady {
@@ -91,10 +103,6 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
tw.process.State().Set(StateWorking)
- if len(p.Body) == 0 && len(p.Context) == 0 {
- return nil, errors.E(op, errors.Str("payload can not be empty"))
- }
-
go func() {
rsp, err := tw.execPayload(p)
if err != nil {
@@ -271,3 +279,17 @@ func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) {
f.Reset()
tw.fPool.Put(f)
}
+
+func (tw *SyncWorkerImpl) getCh() chan wexec {
+ return tw.chPool.Get().(chan wexec)
+}
+
+func (tw *SyncWorkerImpl) putCh(ch chan wexec) {
+ // just check if the chan is not empty
+ select {
+ case <-ch:
+ tw.chPool.Put(ch)
+ default:
+ tw.chPool.Put(ch)
+ }
+}
diff --git a/worker/worker.go b/worker/worker.go
index e5c3a192..0e62651d 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -44,7 +44,8 @@ type Process struct {
// pid of the process, points to pid of underlying process and
// can be nil while process is not started.
- pid int
+ pid int
+ doneCh chan struct{}
// communication bus with underlying process.
relay relay.Relay
@@ -63,6 +64,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
eventsID: id,
cmd: cmd,
state: NewWorkerState(StateInactive),
+ doneCh: make(chan struct{}, 1),
}
// set self as stderr implementation (Writer interface)
@@ -136,6 +138,7 @@ func (w *Process) Wait() error {
var err error
err = w.cmd.Wait()
defer w.events.Unsubscribe(w.eventsID)
+ w.doneCh <- struct{}{}
// If worker was destroyed, just exit
if w.State().Value() == StateDestroyed {
@@ -180,18 +183,25 @@ func (w *Process) closeRelay() error {
func (w *Process) Stop() error {
const op = errors.Op("process_stop")
w.state.Set(StateStopping)
- err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true})
- if err != nil {
- w.state.Set(StateKilling)
- _ = w.cmd.Process.Signal(os.Kill)
+ select {
+ // finished
+ case <-w.doneCh:
+ return nil
+ default:
+ err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true})
+ if err != nil {
+ w.state.Set(StateKilling)
+ _ = w.cmd.Process.Signal(os.Kill)
+
+ w.events.Unsubscribe(w.eventsID)
+ return errors.E(op, errors.Network, err)
+ }
+ <-w.doneCh
+ w.state.Set(StateStopped)
w.events.Unsubscribe(w.eventsID)
- return errors.E(op, errors.Network, err)
+ return nil
}
-
- w.state.Set(StateStopped)
- w.events.Unsubscribe(w.eventsID)
- return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather