diff options
author | Valery Piashchynski <[email protected]> | 2021-12-15 00:12:23 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-12-15 00:12:23 +0300 |
commit | b4e4f7e7e60bff48a63df4a3c606398ea2a32d8a (patch) | |
tree | d018a39795b94f61e1dadde54ce3382fc7e039b0 /worker | |
parent | f2c79017ae5759256b03ec58b608f298a29e4b96 (diff) |
Update static_pool and worker to wait response from the worker
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker')
-rwxr-xr-x | worker/sync_worker.go | 32 | ||||
-rwxr-xr-x | worker/worker.go | 30 |
2 files changed, 47 insertions, 15 deletions
diff --git a/worker/sync_worker.go b/worker/sync_worker.go index 81d8c5bf..e3e85ba6 100755 --- a/worker/sync_worker.go +++ b/worker/sync_worker.go @@ -20,6 +20,7 @@ type SyncWorkerImpl struct { process *Process fPool sync.Pool bPool sync.Pool + chPool sync.Pool } // From creates SyncWorker from BaseProcess @@ -32,12 +33,17 @@ func From(process *Process) *SyncWorkerImpl { bPool: sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}, + + chPool: sync.Pool{New: func() interface{} { + return make(chan wexec, 1) + }}, } } // Exec payload without TTL timeout. func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec") + if len(p.Body) == 0 && len(p.Context) == 0 { return nil, errors.E(op, errors.Str("payload can not be empty")) } @@ -81,7 +87,13 @@ type wexec struct { // ExecWithTTL executes payload without TTL timeout. func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") - c := make(chan wexec, 1) + + if len(p.Body) == 0 && len(p.Context) == 0 { + return nil, errors.E(op, errors.Str("payload can not be empty")) + } + + c := tw.getCh() + defer tw.putCh(c) // worker was killed before it started to work (supervisor) if tw.process.State().Value() != StateReady { @@ -91,10 +103,6 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) ( tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) tw.process.State().Set(StateWorking) - if len(p.Body) == 0 && len(p.Context) == 0 { - return nil, errors.E(op, errors.Str("payload can not be empty")) - } - go func() { rsp, err := tw.execPayload(p) if err != nil { @@ -271,3 +279,17 @@ func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) { f.Reset() tw.fPool.Put(f) } + +func (tw *SyncWorkerImpl) getCh() chan wexec { + return tw.chPool.Get().(chan wexec) +} + +func (tw *SyncWorkerImpl) putCh(ch chan wexec) { + // just check if the chan is not empty + select { + case <-ch: + tw.chPool.Put(ch) + default: + tw.chPool.Put(ch) + } +} diff --git a/worker/worker.go b/worker/worker.go index e5c3a192..0e62651d 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -44,7 +44,8 @@ type Process struct { // pid of the process, points to pid of underlying process and // can be nil while process is not started. - pid int + pid int + doneCh chan struct{} // communication bus with underlying process. relay relay.Relay @@ -63,6 +64,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { eventsID: id, cmd: cmd, state: NewWorkerState(StateInactive), + doneCh: make(chan struct{}, 1), } // set self as stderr implementation (Writer interface) @@ -136,6 +138,7 @@ func (w *Process) Wait() error { var err error err = w.cmd.Wait() defer w.events.Unsubscribe(w.eventsID) + w.doneCh <- struct{}{} // If worker was destroyed, just exit if w.State().Value() == StateDestroyed { @@ -180,18 +183,25 @@ func (w *Process) closeRelay() error { func (w *Process) Stop() error { const op = errors.Op("process_stop") w.state.Set(StateStopping) - err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) - if err != nil { - w.state.Set(StateKilling) - _ = w.cmd.Process.Signal(os.Kill) + select { + // finished + case <-w.doneCh: + return nil + default: + err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) + if err != nil { + w.state.Set(StateKilling) + _ = w.cmd.Process.Signal(os.Kill) + + w.events.Unsubscribe(w.eventsID) + return errors.E(op, errors.Network, err) + } + <-w.doneCh + w.state.Set(StateStopped) w.events.Unsubscribe(w.eventsID) - return errors.E(op, errors.Network, err) + return nil } - - w.state.Set(StateStopped) - w.events.Unsubscribe(w.eventsID) - return nil } // Kill kills underlying process, make sure to call Wait() func to gather |