diff options
Diffstat (limited to 'socket_factory.go')
-rw-r--r-- | socket_factory.go | 94 |
1 files changed, 55 insertions, 39 deletions
diff --git a/socket_factory.go b/socket_factory.go index 28208327..b915973b 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -7,57 +7,66 @@ import ( "os/exec" "sync" "time" + "github.com/pkg/errors" ) // SocketFactory connects to external workers using socket server. type SocketFactory struct { - ls net.Listener // listens for incoming connections from underlying processes - tout time.Duration // connection timeout - mu sync.Mutex // protects socket mapping - wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association + // listens for incoming connections from underlying processes + ls net.Listener + + // relay connection timeout + tout time.Duration + + // protects socket mapping + mu sync.Mutex + + // sockets which are waiting for process association + relays map[int]chan *goridge.SocketRelay } -// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory -// should wait for incoming relay connection +// NewSocketFactory returns SocketFactory attached to a given socket listener. +// tout specifies for how long factory should serve for incoming relay connection func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory { f := &SocketFactory{ - ls: ls, - tout: tout, - wait: make(map[int]chan *goridge.SocketRelay), + ls: ls, + tout: tout, + relays: make(map[int]chan *goridge.SocketRelay), } go f.listen() + return f } -// NewWorker creates worker and connects it to appropriate relay or returns error -func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) { - w, err = newWorker(cmd) - if err != nil { - return nil, err +// SpawnWorker creates worker and connects it to appropriate relay or returns error +func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, workerError error) { + if w, workerError = newWorker(cmd); workerError != nil { + return nil, workerError } if err := w.Start(); err != nil { - return nil, err + return nil, errors.Wrap(err, "process error") } - rl, err := f.waitRelay(*w.Pid, f.tout) + rl, err := f.findRelay(w, f.tout) if err != nil { - return nil, fmt.Errorf("unable to connect to worker: %s", err) + go func(w *Worker) { w.Kill() }(w) + + if wErr := w.Wait(); wErr != nil { + err = errors.Wrap(wErr, err.Error()) + } + + return nil, errors.Wrap(err, "unable to connect to worker") } - w.attach(rl) - w.st = newState(StateReady) + w.rl = rl + w.state.set(StateReady) return w, nil } -// Close closes all open factory descriptors. -func (f *SocketFactory) Close() error { - return f.ls.Close() -} - -// listen for incoming wait and associate sockets with active workers +// listens for incoming socket connections func (f *SocketFactory) listen() { for { conn, err := f.ls.Accept() @@ -66,23 +75,30 @@ func (f *SocketFactory) listen() { } rl := goridge.NewSocketRelay(conn) - if pid, err := fetchPid(rl); err == nil { + if pid, err := fetchPID(rl); err == nil { f.relayChan(pid) <- rl } } } // waits for worker to connect over socket and returns associated relay of timeout -func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) { +func (f *SocketFactory) findRelay(w *Worker, tout time.Duration) (*goridge.SocketRelay, error) { timer := time.NewTimer(tout) - select { - case rl := <-f.relayChan(pid): - timer.Stop() - f.cleanChan(pid) - - return rl, nil - case <-timer.C: - return nil, fmt.Errorf("relay timeout") + for { + select { + case rl := <-f.relayChan(*w.Pid): + timer.Stop() + f.cleanChan(*w.Pid) + return rl, nil + + case <-timer.C: + return nil, fmt.Errorf("relay timeout") + + case <-w.waitDone: + timer.Stop() + f.cleanChan(*w.Pid) + return nil, fmt.Errorf("worker is gone") + } } } @@ -91,10 +107,10 @@ func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay { f.mu.Lock() defer f.mu.Unlock() - rl, ok := f.wait[pid] + rl, ok := f.relays[pid] if !ok { - f.wait[pid] = make(chan *goridge.SocketRelay) - return f.wait[pid] + f.relays[pid] = make(chan *goridge.SocketRelay) + return f.relays[pid] } return rl @@ -105,5 +121,5 @@ func (f *SocketFactory) cleanChan(pid int) { f.mu.Lock() defer f.mu.Unlock() - delete(f.wait, pid) + delete(f.relays, pid) } |