summaryrefslogtreecommitdiff
path: root/socket_factory.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2017-12-26 19:14:53 +0300
committerWolfy-J <[email protected]>2017-12-26 19:14:53 +0300
commite229d83dea4bbe9d0cfe6569c8fbe239690aafb9 (patch)
tree2d4887ffdb167d660b705415f0617458490d0b9f /socket_factory.go
init
Diffstat (limited to 'socket_factory.go')
-rw-r--r--socket_factory.go138
1 files changed, 138 insertions, 0 deletions
diff --git a/socket_factory.go b/socket_factory.go
new file mode 100644
index 00000000..5d1b488b
--- /dev/null
+++ b/socket_factory.go
@@ -0,0 +1,138 @@
+package roadrunner
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/spiral/goridge"
+ "net"
+ "os"
+ "os/exec"
+ "sync"
+ "time"
+)
+
+// SocketFactory connects to external workers using socket server.
+type SocketFactory struct {
+ ls net.Listener // listens for incoming connections from underlying processes
+ tout time.Duration // connection timeout
+ mu sync.Mutex // protects socket mapping
+ wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association
+}
+
+// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory
+// should wait for incoming relay connection
+func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory {
+ f := &SocketFactory{
+ ls: ls,
+ tout: tout,
+ wait: make(map[int]chan *goridge.SocketRelay),
+ }
+
+ go f.listen()
+ return f
+}
+
+// NewWorker creates worker and connects it to appropriate relay or returns error
+func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) {
+ w, err = NewWorker(cmd)
+ if err != nil {
+ return nil, err
+ }
+
+ if err := w.Start(); err != nil {
+ return nil, err
+ }
+
+ w.Pid = &w.cmd.Process.Pid
+ if w.Pid == nil {
+ return nil, fmt.Errorf("can't to start worker %s", w)
+ }
+
+ rl, err := f.waitRelay(*w.Pid, f.tout)
+ if err != nil {
+ return nil, fmt.Errorf("can't connect to worker %s: %s", w, err)
+ }
+
+ w.attach(rl)
+
+ return w, nil
+}
+
+// Close closes all open factory descriptors.
+func (f *SocketFactory) Close() error {
+ return f.ls.Close()
+}
+
+// listen for incoming wait and associate sockets with active workers
+func (f *SocketFactory) listen() {
+ for {
+ conn, err := f.ls.Accept()
+ if err != nil {
+ return
+ }
+
+ rl := goridge.NewSocketRelay(conn)
+ if pid, err := fetchPID(rl); err == nil {
+ f.relayChan(pid) <- rl
+ }
+ }
+}
+
+// waits for worker to connect over socket and returns associated relay of timeout
+func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) {
+ timer := time.NewTimer(tout)
+ select {
+ case rl := <-f.relayChan(pid):
+ timer.Stop()
+ f.cleanChan(pid)
+
+ return rl, nil
+ case <-timer.C:
+ return nil, fmt.Errorf("relay timer for [%v]", pid)
+ }
+}
+
+// chan to store relay associated with specific Pid
+func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ rl, ok := f.wait[pid]
+ if !ok {
+ f.wait[pid] = make(chan *goridge.SocketRelay)
+ return f.wait[pid]
+ }
+
+ return rl
+}
+
+// deletes relay chan associated with specific Pid
+func (f *SocketFactory) cleanChan(pid int) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ delete(f.wait, pid)
+}
+
+// send control command to relay and return associated Pid (or error)
+func fetchPID(rl goridge.Relay) (pid int, err error) {
+ if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil {
+ return 0, err
+ }
+
+ body, p, err := rl.Receive()
+ if !p.HasFlag(goridge.PayloadControl) {
+ return 0, fmt.Errorf("unexpected response, `control` header is missing")
+ }
+
+ link := &PidCommand{}
+ if err := json.Unmarshal(body, link); err != nil {
+ return 0, err
+ }
+
+ if link.Parent != os.Getpid() {
+ return 0, fmt.Errorf("integrity error, parent process does not match")
+ }
+
+ return link.Pid, nil
+}