1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
package server
import (
"context"
"time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/server"
)
const ConfigSection = "server"
const Response = "test"
var testPoolConfig = &pool.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
Supervisor: &pool.SupervisorConfig{
WatchTick: 60 * time.Second,
TTL: 1000 * time.Second,
IdleTTL: 10 * time.Second,
ExecTTL: 10 * time.Second,
MaxWorkerMemory: 1000,
},
}
type Foo struct {
configProvider config.Configurer
wf server.Server
pool pool.Pool
}
func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error {
f.configProvider = p
f.wf = workerFactory
return nil
}
func (f *Foo) Serve() chan error {
const op = errors.Op("serve")
// test payload for echo
r := payload.Payload{
Context: nil,
Body: []byte(Response),
}
errCh := make(chan error, 1)
conf := &server.Config{}
var err error
err = f.configProvider.UnmarshalKey(ConfigSection, conf)
if err != nil {
errCh <- err
return errCh
}
// test CMDFactory
cmd, err := f.wf.CmdFactory(nil)
if err != nil {
errCh <- err
return errCh
}
if cmd == nil {
errCh <- errors.E(op, errors.Str("command is nil"))
return errCh
}
// test worker creation
w, err := f.wf.NewWorker(context.Background(), nil)
if err != nil {
errCh <- err
return errCh
}
// test that our worker is functional
sw := worker.From(w)
rsp, err := sw.Exec(r)
if err != nil {
errCh <- err
return errCh
}
if string(rsp.Body) != Response {
errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
return errCh
}
// should not be errors
err = sw.Stop()
if err != nil {
errCh <- err
return errCh
}
// test pool
f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil)
if err != nil {
errCh <- err
return errCh
}
// test pool execution
rsp, err = f.pool.Exec(r)
if err != nil {
errCh <- err
return errCh
}
// echo of the "test" should be -> test
if string(rsp.Body) != Response {
errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
return errCh
}
return errCh
}
func (f *Foo) Stop() error {
f.pool.Destroy(context.Background())
return nil
}
|