summaryrefslogtreecommitdiff
path: root/socket_factory.go
diff options
context:
space:
mode:
Diffstat (limited to 'socket_factory.go')
-rw-r--r--socket_factory.go94
1 files changed, 55 insertions, 39 deletions
diff --git a/socket_factory.go b/socket_factory.go
index 28208327..b915973b 100644
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -7,57 +7,66 @@ import (
"os/exec"
"sync"
"time"
+ "github.com/pkg/errors"
)
// SocketFactory connects to external workers using socket server.
type SocketFactory struct {
- ls net.Listener // listens for incoming connections from underlying processes
- tout time.Duration // connection timeout
- mu sync.Mutex // protects socket mapping
- wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association
+ // listens for incoming connections from underlying processes
+ ls net.Listener
+
+ // relay connection timeout
+ tout time.Duration
+
+ // protects socket mapping
+ mu sync.Mutex
+
+ // sockets which are waiting for process association
+ relays map[int]chan *goridge.SocketRelay
}
-// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory
-// should wait for incoming relay connection
+// NewSocketFactory returns SocketFactory attached to a given socket listener.
+// tout specifies for how long factory should serve for incoming relay connection
func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory {
f := &SocketFactory{
- ls: ls,
- tout: tout,
- wait: make(map[int]chan *goridge.SocketRelay),
+ ls: ls,
+ tout: tout,
+ relays: make(map[int]chan *goridge.SocketRelay),
}
go f.listen()
+
return f
}
-// NewWorker creates worker and connects it to appropriate relay or returns error
-func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) {
- w, err = newWorker(cmd)
- if err != nil {
- return nil, err
+// SpawnWorker creates worker and connects it to appropriate relay or returns error
+func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, workerError error) {
+ if w, workerError = newWorker(cmd); workerError != nil {
+ return nil, workerError
}
if err := w.Start(); err != nil {
- return nil, err
+ return nil, errors.Wrap(err, "process error")
}
- rl, err := f.waitRelay(*w.Pid, f.tout)
+ rl, err := f.findRelay(w, f.tout)
if err != nil {
- return nil, fmt.Errorf("unable to connect to worker: %s", err)
+ go func(w *Worker) { w.Kill() }(w)
+
+ if wErr := w.Wait(); wErr != nil {
+ err = errors.Wrap(wErr, err.Error())
+ }
+
+ return nil, errors.Wrap(err, "unable to connect to worker")
}
- w.attach(rl)
- w.st = newState(StateReady)
+ w.rl = rl
+ w.state.set(StateReady)
return w, nil
}
-// Close closes all open factory descriptors.
-func (f *SocketFactory) Close() error {
- return f.ls.Close()
-}
-
-// listen for incoming wait and associate sockets with active workers
+// listens for incoming socket connections
func (f *SocketFactory) listen() {
for {
conn, err := f.ls.Accept()
@@ -66,23 +75,30 @@ func (f *SocketFactory) listen() {
}
rl := goridge.NewSocketRelay(conn)
- if pid, err := fetchPid(rl); err == nil {
+ if pid, err := fetchPID(rl); err == nil {
f.relayChan(pid) <- rl
}
}
}
// waits for worker to connect over socket and returns associated relay of timeout
-func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) {
+func (f *SocketFactory) findRelay(w *Worker, tout time.Duration) (*goridge.SocketRelay, error) {
timer := time.NewTimer(tout)
- select {
- case rl := <-f.relayChan(pid):
- timer.Stop()
- f.cleanChan(pid)
-
- return rl, nil
- case <-timer.C:
- return nil, fmt.Errorf("relay timeout")
+ for {
+ select {
+ case rl := <-f.relayChan(*w.Pid):
+ timer.Stop()
+ f.cleanChan(*w.Pid)
+ return rl, nil
+
+ case <-timer.C:
+ return nil, fmt.Errorf("relay timeout")
+
+ case <-w.waitDone:
+ timer.Stop()
+ f.cleanChan(*w.Pid)
+ return nil, fmt.Errorf("worker is gone")
+ }
}
}
@@ -91,10 +107,10 @@ func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay {
f.mu.Lock()
defer f.mu.Unlock()
- rl, ok := f.wait[pid]
+ rl, ok := f.relays[pid]
if !ok {
- f.wait[pid] = make(chan *goridge.SocketRelay)
- return f.wait[pid]
+ f.relays[pid] = make(chan *goridge.SocketRelay)
+ return f.relays[pid]
}
return rl
@@ -105,5 +121,5 @@ func (f *SocketFactory) cleanChan(pid int) {
f.mu.Lock()
defer f.mu.Unlock()
- delete(f.wait, pid)
+ delete(f.relays, pid)
}