summaryrefslogtreecommitdiff
path: root/socket_factory.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-01-23 19:51:15 -0500
committerWolfy-J <[email protected]>2018-01-23 19:51:15 -0500
commit78a42de837928cf7d10a1ae04d7e82e56d66e1e2 (patch)
tree8882b9a051bcc9c42328df583c0bb8c39a89591e /socket_factory.go
parentfa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff)
API update
Diffstat (limited to 'socket_factory.go')
-rw-r--r--socket_factory.go121
1 files changed, 54 insertions, 67 deletions
diff --git a/socket_factory.go b/socket_factory.go
index 5d1b488b..b915973b 100644
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -1,69 +1,72 @@
package roadrunner
import (
- "encoding/json"
"fmt"
"github.com/spiral/goridge"
"net"
- "os"
"os/exec"
"sync"
"time"
+ "github.com/pkg/errors"
)
// SocketFactory connects to external workers using socket server.
type SocketFactory struct {
- ls net.Listener // listens for incoming connections from underlying processes
- tout time.Duration // connection timeout
- mu sync.Mutex // protects socket mapping
- wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association
+ // listens for incoming connections from underlying processes
+ ls net.Listener
+
+ // relay connection timeout
+ tout time.Duration
+
+ // protects socket mapping
+ mu sync.Mutex
+
+ // sockets which are waiting for process association
+ relays map[int]chan *goridge.SocketRelay
}
-// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory
-// should wait for incoming relay connection
+// NewSocketFactory returns SocketFactory attached to a given socket listener.
+// tout specifies for how long factory should serve for incoming relay connection
func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory {
f := &SocketFactory{
- ls: ls,
- tout: tout,
- wait: make(map[int]chan *goridge.SocketRelay),
+ ls: ls,
+ tout: tout,
+ relays: make(map[int]chan *goridge.SocketRelay),
}
go f.listen()
+
return f
}
-// NewWorker creates worker and connects it to appropriate relay or returns error
-func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) {
- w, err = NewWorker(cmd)
- if err != nil {
- return nil, err
+// SpawnWorker creates worker and connects it to appropriate relay or returns error
+func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, workerError error) {
+ if w, workerError = newWorker(cmd); workerError != nil {
+ return nil, workerError
}
if err := w.Start(); err != nil {
- return nil, err
- }
-
- w.Pid = &w.cmd.Process.Pid
- if w.Pid == nil {
- return nil, fmt.Errorf("can't to start worker %s", w)
+ return nil, errors.Wrap(err, "process error")
}
- rl, err := f.waitRelay(*w.Pid, f.tout)
+ rl, err := f.findRelay(w, f.tout)
if err != nil {
- return nil, fmt.Errorf("can't connect to worker %s: %s", w, err)
+ go func(w *Worker) { w.Kill() }(w)
+
+ if wErr := w.Wait(); wErr != nil {
+ err = errors.Wrap(wErr, err.Error())
+ }
+
+ return nil, errors.Wrap(err, "unable to connect to worker")
}
- w.attach(rl)
+ w.rl = rl
+ w.state.set(StateReady)
return w, nil
}
-// Close closes all open factory descriptors.
-func (f *SocketFactory) Close() error {
- return f.ls.Close()
-}
-
-// listen for incoming wait and associate sockets with active workers
+// listens for incoming socket connections
func (f *SocketFactory) listen() {
for {
conn, err := f.ls.Accept()
@@ -79,16 +82,23 @@ func (f *SocketFactory) listen() {
}
// waits for worker to connect over socket and returns associated relay of timeout
-func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) {
+func (f *SocketFactory) findRelay(w *Worker, tout time.Duration) (*goridge.SocketRelay, error) {
timer := time.NewTimer(tout)
- select {
- case rl := <-f.relayChan(pid):
- timer.Stop()
- f.cleanChan(pid)
-
- return rl, nil
- case <-timer.C:
- return nil, fmt.Errorf("relay timer for [%v]", pid)
+ for {
+ select {
+ case rl := <-f.relayChan(*w.Pid):
+ timer.Stop()
+ f.cleanChan(*w.Pid)
+ return rl, nil
+
+ case <-timer.C:
+ return nil, fmt.Errorf("relay timeout")
+
+ case <-w.waitDone:
+ timer.Stop()
+ f.cleanChan(*w.Pid)
+ return nil, fmt.Errorf("worker is gone")
+ }
}
}
@@ -97,10 +107,10 @@ func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay {
f.mu.Lock()
defer f.mu.Unlock()
- rl, ok := f.wait[pid]
+ rl, ok := f.relays[pid]
if !ok {
- f.wait[pid] = make(chan *goridge.SocketRelay)
- return f.wait[pid]
+ f.relays[pid] = make(chan *goridge.SocketRelay)
+ return f.relays[pid]
}
return rl
@@ -111,28 +121,5 @@ func (f *SocketFactory) cleanChan(pid int) {
f.mu.Lock()
defer f.mu.Unlock()
- delete(f.wait, pid)
-}
-
-// send control command to relay and return associated Pid (or error)
-func fetchPID(rl goridge.Relay) (pid int, err error) {
- if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil {
- return 0, err
- }
-
- body, p, err := rl.Receive()
- if !p.HasFlag(goridge.PayloadControl) {
- return 0, fmt.Errorf("unexpected response, `control` header is missing")
- }
-
- link := &PidCommand{}
- if err := json.Unmarshal(body, link); err != nil {
- return 0, err
- }
-
- if link.Parent != os.Getpid() {
- return 0, fmt.Errorf("integrity error, parent process does not match")
- }
-
- return link.Pid, nil
+ delete(f.relays, pid)
}