1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
package factory
import (
"errors"
"fmt"
"os"
"os/exec"
"strings"
"time"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
// AppConfig config combines factory, pool and cmd configurations.
type AppConfig struct {
Command string
User string
Group string
Env Env
// Listen defines connection method and factory to be used to connect to workers:
// "pipes", "tcp://:6001", "unix://rr.sock"
// This config section must not change on re-configuration.
Relay string
// RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
// must not change on re-configuration.
RelayTimeout time.Duration
}
type App struct {
cfg AppConfig
configProvider config.Provider
}
func (app *App) Init(provider config.Provider) error {
app.cfg = AppConfig{}
app.configProvider = provider
err := app.configProvider.UnmarshalKey("app", &app.cfg)
if err != nil {
return err
}
if app.cfg.Relay == "" {
app.cfg.Relay = "pipes"
}
return nil
}
func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
// create command according to the config
cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
return func() *exec.Cmd {
cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
util.IsolateProcess(cmd)
// if user is not empty, and OS is linux or macos
// execute php worker from that particular user
if app.cfg.User != "" {
err := util.ExecuteFromUser(cmd, app.cfg.User)
if err != nil {
return nil
}
}
cmd.Env = app.setEnv(env)
return cmd
}, nil
}
// todo ENV unused
func (app *App) NewFactory() (roadrunner.Factory, error) {
if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
dsn := strings.Split(app.cfg.Relay, "://")
if len(dsn) != 2 {
return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
}
lsn, err := util.CreateListener(app.cfg.Relay)
if err != nil {
return nil, err
}
switch dsn[0] {
// sockets group
case "unix":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
case "tcp":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
default:
return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
}
}
func (app *App) setEnv(e Env) []string {
env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
for k, v := range e {
env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
}
return env
}
|