diff options
author | Wolfy-J <[email protected]> | 2017-12-26 19:14:53 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2017-12-26 19:14:53 +0300 |
commit | e229d83dea4bbe9d0cfe6569c8fbe239690aafb9 (patch) | |
tree | 2d4887ffdb167d660b705415f0617458490d0b9f /socket_factory.go |
init
Diffstat (limited to 'socket_factory.go')
-rw-r--r-- | socket_factory.go | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/socket_factory.go b/socket_factory.go new file mode 100644 index 00000000..5d1b488b --- /dev/null +++ b/socket_factory.go @@ -0,0 +1,138 @@ +package roadrunner + +import ( + "encoding/json" + "fmt" + "github.com/spiral/goridge" + "net" + "os" + "os/exec" + "sync" + "time" +) + +// SocketFactory connects to external workers using socket server. +type SocketFactory struct { + ls net.Listener // listens for incoming connections from underlying processes + tout time.Duration // connection timeout + mu sync.Mutex // protects socket mapping + wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association +} + +// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory +// should wait for incoming relay connection +func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory { + f := &SocketFactory{ + ls: ls, + tout: tout, + wait: make(map[int]chan *goridge.SocketRelay), + } + + go f.listen() + return f +} + +// NewWorker creates worker and connects it to appropriate relay or returns error +func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) { + w, err = NewWorker(cmd) + if err != nil { + return nil, err + } + + if err := w.Start(); err != nil { + return nil, err + } + + w.Pid = &w.cmd.Process.Pid + if w.Pid == nil { + return nil, fmt.Errorf("can't to start worker %s", w) + } + + rl, err := f.waitRelay(*w.Pid, f.tout) + if err != nil { + return nil, fmt.Errorf("can't connect to worker %s: %s", w, err) + } + + w.attach(rl) + + return w, nil +} + +// Close closes all open factory descriptors. +func (f *SocketFactory) Close() error { + return f.ls.Close() +} + +// listen for incoming wait and associate sockets with active workers +func (f *SocketFactory) listen() { + for { + conn, err := f.ls.Accept() + if err != nil { + return + } + + rl := goridge.NewSocketRelay(conn) + if pid, err := fetchPID(rl); err == nil { + f.relayChan(pid) <- rl + } + } +} + +// waits for worker to connect over socket and returns associated relay of timeout +func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) { + timer := time.NewTimer(tout) + select { + case rl := <-f.relayChan(pid): + timer.Stop() + f.cleanChan(pid) + + return rl, nil + case <-timer.C: + return nil, fmt.Errorf("relay timer for [%v]", pid) + } +} + +// chan to store relay associated with specific Pid +func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay { + f.mu.Lock() + defer f.mu.Unlock() + + rl, ok := f.wait[pid] + if !ok { + f.wait[pid] = make(chan *goridge.SocketRelay) + return f.wait[pid] + } + + return rl +} + +// deletes relay chan associated with specific Pid +func (f *SocketFactory) cleanChan(pid int) { + f.mu.Lock() + defer f.mu.Unlock() + + delete(f.wait, pid) +} + +// send control command to relay and return associated Pid (or error) +func fetchPID(rl goridge.Relay) (pid int, err error) { + if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil { + return 0, err + } + + body, p, err := rl.Receive() + if !p.HasFlag(goridge.PayloadControl) { + return 0, fmt.Errorf("unexpected response, `control` header is missing") + } + + link := &PidCommand{} + if err := json.Unmarshal(body, link); err != nil { + return 0, err + } + + if link.Parent != os.Getpid() { + return 0, fmt.Errorf("integrity error, parent process does not match") + } + + return link.Pid, nil +} |