summaryrefslogtreecommitdiff
path: root/socket_factory.go
diff options
context:
space:
mode:
Diffstat (limited to 'socket_factory.go')
-rw-r--r--socket_factory.go140
1 files changed, 0 insertions, 140 deletions
diff --git a/socket_factory.go b/socket_factory.go
deleted file mode 100644
index f652e056..00000000
--- a/socket_factory.go
+++ /dev/null
@@ -1,140 +0,0 @@
-package roadrunner
-
-import (
- "fmt"
- "net"
- "os/exec"
- "sync"
- "time"
-
- "github.com/pkg/errors"
- "github.com/spiral/goridge/v2"
-)
-
-// SocketFactory connects to external workers using socket server.
-type SocketFactory struct {
- // listens for incoming connections from underlying processes
- ls net.Listener
-
- // relay connection timeout
- tout time.Duration
-
- // protects socket mapping
- mu sync.Mutex
-
- // sockets which are waiting for process association
- relays map[int]chan *goridge.SocketRelay
-}
-
-// NewSocketFactory returns SocketFactory attached to a given socket lsn.
-// tout specifies for how long factory should serve for incoming relay connection
-func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory {
- f := &SocketFactory{
- ls: ls,
- tout: tout,
- relays: make(map[int]chan *goridge.SocketRelay),
- }
-
- go f.listen()
-
- return f
-}
-
-// SpawnWorker creates worker and connects it to appropriate relay or returns error
-func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
- if w, err = newWorker(cmd); err != nil {
- return nil, err
- }
-
- if err := w.start(); err != nil {
- return nil, errors.Wrap(err, "process error")
- }
-
- rl, err := f.findRelay(w, f.tout)
- if err != nil {
- go func(w *Worker) {
- err := w.Kill()
- if err != nil {
- fmt.Println(fmt.Errorf("error killing the worker %v", err))
- }
- }(w)
-
- if wErr := w.Wait(); wErr != nil {
- if _, ok := wErr.(*exec.ExitError); ok {
- err = errors.Wrap(wErr, err.Error())
- } else {
- err = wErr
- }
- }
-
- return nil, errors.Wrap(err, "unable to connect to worker")
- }
-
- w.rl = rl
- w.state.set(StateReady)
-
- return w, nil
-}
-
-// Close socket factory and underlying socket connection.
-func (f *SocketFactory) Close() error {
- return f.ls.Close()
-}
-
-// listens for incoming socket connections
-func (f *SocketFactory) listen() {
- for {
- conn, err := f.ls.Accept()
- if err != nil {
- return
- }
-
- rl := goridge.NewSocketRelay(conn)
- if pid, err := fetchPID(rl); err == nil {
- f.relayChan(pid) <- rl
- }
- }
-}
-
-// waits for worker to connect over socket and returns associated relay of timeout
-func (f *SocketFactory) findRelay(w *Worker, tout time.Duration) (*goridge.SocketRelay, error) {
- timer := time.NewTimer(tout)
- for {
- select {
- case rl := <-f.relayChan(*w.Pid):
- timer.Stop()
- f.cleanChan(*w.Pid)
- return rl, nil
-
- case <-timer.C:
- return nil, fmt.Errorf("relay timeout")
-
- case <-w.waitDone:
- timer.Stop()
- f.cleanChan(*w.Pid)
- return nil, fmt.Errorf("worker is gone")
- }
- }
-}
-
-// chan to store relay associated with specific Pid
-func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay {
- f.mu.Lock()
- defer f.mu.Unlock()
-
- rl, ok := f.relays[pid]
- if !ok {
- f.relays[pid] = make(chan *goridge.SocketRelay)
- return f.relays[pid]
- }
-
- return rl
-}
-
-// deletes relay chan associated with specific Pid
-func (f *SocketFactory) cleanChan(pid int) {
- f.mu.Lock()
- defer f.mu.Unlock()
-
- delete(f.relays, pid)
-}