1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
package tests
import (
"context"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.temporal.io/sdk/client"
)
func Test_WorkerError_DisasterRecovery(t *testing.T) {
s := NewTestServer()
defer s.MustClose()
p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
assert.NoError(t, err)
w, err := s.Client().ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{
TaskQueue: "default",
},
"TimerWorkflow",
"Hello World",
)
assert.NoError(t, err)
time.Sleep(time.Millisecond * 750)
// must fully recover with new worker
assert.NoError(t, p.Kill())
var result string
assert.NoError(t, w.Get(context.Background(), &result))
assert.Equal(t, "hello world", result)
}
func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) {
s := NewTestServer()
defer s.MustClose()
defer func() {
// always restore script
_ = os.Rename("worker.bak", "worker.php")
}()
// Makes worker pool unable to recover for some time
_ = os.Rename("worker.php", "worker.bak")
p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
assert.NoError(t, err)
// must fully recover with new worker
assert.NoError(t, p.Kill())
w, err := s.Client().ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{
TaskQueue: "default",
},
"TimerWorkflow",
"Hello World",
)
assert.NoError(t, err)
time.Sleep(time.Millisecond * 750)
// restore the script and recover activity pool
_ = os.Rename("worker.bak", "worker.php")
var result string
assert.NoError(t, w.Get(context.Background(), &result))
assert.Equal(t, "hello world", result)
}
func Test_ActivityError_DisasterRecovery(t *testing.T) {
s := NewTestServer()
defer s.MustClose()
defer func() {
// always restore script
_ = os.Rename("worker.bak", "worker.php")
}()
// Makes worker pool unable to recover for some time
_ = os.Rename("worker.php", "worker.bak")
// destroys all workers in activities
for _, wrk := range s.activities.Workers() {
assert.NoError(t, wrk.Kill())
}
w, err := s.Client().ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{
TaskQueue: "default",
},
"SimpleWorkflow",
"Hello World",
)
assert.NoError(t, err)
// activity can't complete at this moment
time.Sleep(time.Millisecond * 750)
// restore the script and recover activity pool
_ = os.Rename("worker.bak", "worker.php")
var result string
assert.NoError(t, w.Get(context.Background(), &result))
assert.Equal(t, "HELLO WORLD", result)
}
|