diff options
author | Wolfy-J <[email protected]> | 2018-01-23 19:51:15 -0500 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-01-23 19:51:15 -0500 |
commit | 78a42de837928cf7d10a1ae04d7e82e56d66e1e2 (patch) | |
tree | 8882b9a051bcc9c42328df583c0bb8c39a89591e /socket_factory.go | |
parent | fa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff) |
API update
Diffstat (limited to 'socket_factory.go')
-rw-r--r-- | socket_factory.go | 121 |
1 files changed, 54 insertions, 67 deletions
diff --git a/socket_factory.go b/socket_factory.go index 5d1b488b..b915973b 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -1,69 +1,72 @@ package roadrunner import ( - "encoding/json" "fmt" "github.com/spiral/goridge" "net" - "os" "os/exec" "sync" "time" + "github.com/pkg/errors" ) // SocketFactory connects to external workers using socket server. type SocketFactory struct { - ls net.Listener // listens for incoming connections from underlying processes - tout time.Duration // connection timeout - mu sync.Mutex // protects socket mapping - wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association + // listens for incoming connections from underlying processes + ls net.Listener + + // relay connection timeout + tout time.Duration + + // protects socket mapping + mu sync.Mutex + + // sockets which are waiting for process association + relays map[int]chan *goridge.SocketRelay } -// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory -// should wait for incoming relay connection +// NewSocketFactory returns SocketFactory attached to a given socket listener. +// tout specifies for how long factory should serve for incoming relay connection func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory { f := &SocketFactory{ - ls: ls, - tout: tout, - wait: make(map[int]chan *goridge.SocketRelay), + ls: ls, + tout: tout, + relays: make(map[int]chan *goridge.SocketRelay), } go f.listen() + return f } -// NewWorker creates worker and connects it to appropriate relay or returns error -func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) { - w, err = NewWorker(cmd) - if err != nil { - return nil, err +// SpawnWorker creates worker and connects it to appropriate relay or returns error +func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, workerError error) { + if w, workerError = newWorker(cmd); workerError != nil { + return nil, workerError } if err := w.Start(); err != nil { - return nil, err - } - - w.Pid = &w.cmd.Process.Pid - if w.Pid == nil { - return nil, fmt.Errorf("can't to start worker %s", w) + return nil, errors.Wrap(err, "process error") } - rl, err := f.waitRelay(*w.Pid, f.tout) + rl, err := f.findRelay(w, f.tout) if err != nil { - return nil, fmt.Errorf("can't connect to worker %s: %s", w, err) + go func(w *Worker) { w.Kill() }(w) + + if wErr := w.Wait(); wErr != nil { + err = errors.Wrap(wErr, err.Error()) + } + + return nil, errors.Wrap(err, "unable to connect to worker") } - w.attach(rl) + w.rl = rl + w.state.set(StateReady) return w, nil } -// Close closes all open factory descriptors. -func (f *SocketFactory) Close() error { - return f.ls.Close() -} - -// listen for incoming wait and associate sockets with active workers +// listens for incoming socket connections func (f *SocketFactory) listen() { for { conn, err := f.ls.Accept() @@ -79,16 +82,23 @@ func (f *SocketFactory) listen() { } // waits for worker to connect over socket and returns associated relay of timeout -func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) { +func (f *SocketFactory) findRelay(w *Worker, tout time.Duration) (*goridge.SocketRelay, error) { timer := time.NewTimer(tout) - select { - case rl := <-f.relayChan(pid): - timer.Stop() - f.cleanChan(pid) - - return rl, nil - case <-timer.C: - return nil, fmt.Errorf("relay timer for [%v]", pid) + for { + select { + case rl := <-f.relayChan(*w.Pid): + timer.Stop() + f.cleanChan(*w.Pid) + return rl, nil + + case <-timer.C: + return nil, fmt.Errorf("relay timeout") + + case <-w.waitDone: + timer.Stop() + f.cleanChan(*w.Pid) + return nil, fmt.Errorf("worker is gone") + } } } @@ -97,10 +107,10 @@ func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay { f.mu.Lock() defer f.mu.Unlock() - rl, ok := f.wait[pid] + rl, ok := f.relays[pid] if !ok { - f.wait[pid] = make(chan *goridge.SocketRelay) - return f.wait[pid] + f.relays[pid] = make(chan *goridge.SocketRelay) + return f.relays[pid] } return rl @@ -111,28 +121,5 @@ func (f *SocketFactory) cleanChan(pid int) { f.mu.Lock() defer f.mu.Unlock() - delete(f.wait, pid) -} - -// send control command to relay and return associated Pid (or error) -func fetchPID(rl goridge.Relay) (pid int, err error) { - if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil { - return 0, err - } - - body, p, err := rl.Receive() - if !p.HasFlag(goridge.PayloadControl) { - return 0, fmt.Errorf("unexpected response, `control` header is missing") - } - - link := &PidCommand{} - if err := json.Unmarshal(body, link); err != nil { - return 0, err - } - - if link.Parent != os.Getpid() { - return 0, fmt.Errorf("integrity error, parent process does not match") - } - - return link.Pid, nil + delete(f.relays, pid) } |