summaryrefslogtreecommitdiff
path: root/socket_factory.go
blob: 5d1b488b4f36c5a677822b3924d48f030f8259e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package roadrunner

import (
	"encoding/json"
	"fmt"
	"github.com/spiral/goridge"
	"net"
	"os"
	"os/exec"
	"sync"
	"time"
)

// SocketFactory connects to external workers using socket server.
type SocketFactory struct {
	ls   net.Listener                      // listens for incoming connections from underlying processes
	tout time.Duration                     // connection timeout
	mu   sync.Mutex                        // protects socket mapping
	wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association
}

// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory
// should wait for incoming relay connection
func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory {
	f := &SocketFactory{
		ls:   ls,
		tout: tout,
		wait: make(map[int]chan *goridge.SocketRelay),
	}

	go f.listen()
	return f
}

// NewWorker creates worker and connects it to appropriate relay or returns error
func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) {
	w, err = NewWorker(cmd)
	if err != nil {
		return nil, err
	}

	if err := w.Start(); err != nil {
		return nil, err
	}

	w.Pid = &w.cmd.Process.Pid
	if w.Pid == nil {
		return nil, fmt.Errorf("can't to start worker %s", w)
	}

	rl, err := f.waitRelay(*w.Pid, f.tout)
	if err != nil {
		return nil, fmt.Errorf("can't connect to worker %s: %s", w, err)
	}

	w.attach(rl)

	return w, nil
}

// Close closes all open factory descriptors.
func (f *SocketFactory) Close() error {
	return f.ls.Close()
}

// listen for incoming wait and associate sockets with active workers
func (f *SocketFactory) listen() {
	for {
		conn, err := f.ls.Accept()
		if err != nil {
			return
		}

		rl := goridge.NewSocketRelay(conn)
		if pid, err := fetchPID(rl); err == nil {
			f.relayChan(pid) <- rl
		}
	}
}

// waits for worker to connect over socket and returns associated relay of timeout
func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) {
	timer := time.NewTimer(tout)
	select {
	case rl := <-f.relayChan(pid):
		timer.Stop()
		f.cleanChan(pid)

		return rl, nil
	case <-timer.C:
		return nil, fmt.Errorf("relay timer for [%v]", pid)
	}
}

// chan to store relay associated with specific Pid
func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay {
	f.mu.Lock()
	defer f.mu.Unlock()

	rl, ok := f.wait[pid]
	if !ok {
		f.wait[pid] = make(chan *goridge.SocketRelay)
		return f.wait[pid]
	}

	return rl
}

// deletes relay chan associated with specific Pid
func (f *SocketFactory) cleanChan(pid int) {
	f.mu.Lock()
	defer f.mu.Unlock()

	delete(f.wait, pid)
}

// send control command to relay and return associated Pid (or error)
func fetchPID(rl goridge.Relay) (pid int, err error) {
	if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil {
		return 0, err
	}

	body, p, err := rl.Receive()
	if !p.HasFlag(goridge.PayloadControl) {
		return 0, fmt.Errorf("unexpected response, `control` header is missing")
	}

	link := &PidCommand{}
	if err := json.Unmarshal(body, link); err != nil {
		return 0, err
	}

	if link.Parent != os.Getpid() {
		return 0, fmt.Errorf("integrity error, parent process does not match")
	}

	return link.Pid, nil
}