summaryrefslogtreecommitdiff
path: root/server_config.go
blob: 5403ff01faf9af7e917c8c6acb625fd889c38d2b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package roadrunner

import (
	"errors"
	"fmt"
	"github.com/spiral/roadrunner/osutil"
	"net"
	"os"
	"os/exec"
	"strings"
	"sync"
	"syscall"
	"time"
)

// CommandProducer can produce commands.
type CommandProducer func(cfg *ServerConfig) func() *exec.Cmd

// ServerConfig config combines factory, pool and cmd configurations.
type ServerConfig struct {
	// Command includes command strings with all the parameters, example: "php worker.php pipes".
	Command string

	// CommandProducer overwrites
	CommandProducer CommandProducer

	// Relay defines connection method and factory to be used to connect to workers:
	// "pipes", "tcp://:6001", "unix://rr.sock"
	// This config section must not change on re-configuration.
	Relay string

	// RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
	// must not change on re-configuration.
	RelayTimeout time.Duration

	// Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change
	// while server is running.
	Pool *Config

	// values defines set of values to be passed to the command context.
	mu  sync.Mutex
	env map[string]string
}

// InitDefaults sets missing values to their default values.
func (cfg *ServerConfig) InitDefaults() error {
	cfg.Relay = "pipes"
	cfg.RelayTimeout = time.Minute

	if cfg.Pool == nil {
		cfg.Pool = &Config{}
	}

	return cfg.Pool.InitDefaults()
}

// UpscaleDurations converts duration values from nanoseconds to seconds.
func (cfg *ServerConfig) UpscaleDurations() {
	if cfg.RelayTimeout < time.Microsecond {
		cfg.RelayTimeout = time.Second * time.Duration(cfg.RelayTimeout.Nanoseconds())
	}

	if cfg.Pool.AllocateTimeout < time.Microsecond {
		cfg.Pool.AllocateTimeout = time.Second * time.Duration(cfg.Pool.AllocateTimeout.Nanoseconds())
	}

	if cfg.Pool.DestroyTimeout < time.Microsecond {
		cfg.Pool.DestroyTimeout = time.Second * time.Duration(cfg.Pool.DestroyTimeout.Nanoseconds())
	}
}

// Differs returns true if configuration has changed but ignores pool or cmd changes.
func (cfg *ServerConfig) Differs(new *ServerConfig) bool {
	return cfg.Relay != new.Relay || cfg.RelayTimeout != new.RelayTimeout
}

// SetEnv sets new environment variable. Value is automatically uppercase-d.
func (cfg *ServerConfig) SetEnv(k, v string) {
	cfg.mu.Lock()
	defer cfg.mu.Unlock()

	if cfg.env == nil {
		cfg.env = make(map[string]string)
	}

	cfg.env[k] = v
}

// GetEnv must return list of env variables.
func (cfg *ServerConfig) GetEnv() (env []string) {
	env = append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", cfg.Relay))
	for k, v := range cfg.env {
		env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
	}

	return
}

// makeCommands returns new command provider based on configured options.
func (cfg *ServerConfig) makeCommand() func() *exec.Cmd {
	cfg.mu.Lock()
	defer cfg.mu.Unlock()

	if cfg.CommandProducer != nil {
		return cfg.CommandProducer(cfg)
	}

	var cmd = strings.Split(cfg.Command, " ")
	return func() *exec.Cmd {
		cmd := exec.Command(cmd[0], cmd[1:]...)
		osutil.IsolateProcess(cmd)

		cmd.Env = cfg.GetEnv()

		return cmd
	}
}

// makeFactory creates and connects new factory instance based on given parameters.
func (cfg *ServerConfig) makeFactory() (Factory, error) {
	if cfg.Relay == "pipes" || cfg.Relay == "pipe" {
		return NewPipeFactory(), nil
	}

	dsn := strings.Split(cfg.Relay, "://")
	if len(dsn) != 2 {
		return nil, errors.New("invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)")
	}

	if dsn[0] == "unix" && fileExists(dsn[1]) {
		err := syscall.Unlink(dsn[1])
		if err != nil {
			return nil, err
		}
	}

	ln, err := net.Listen(dsn[0], dsn[1])
	if err != nil {
		return nil, err
	}

	return NewSocketFactory(ln, cfg.RelayTimeout), nil
}

// fileExists checks if a file exists and is not a directory before we
// try using it to prevent further errors.
func fileExists(filename string) bool {
	info, err := os.Stat(filename)
	if os.IsNotExist(err) {
		return false
	}
	return !info.IsDir()
}