summaryrefslogtreecommitdiff
path: root/tests/plugins/temporal/disaster_test.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-26 11:52:03 +0300
committerGitHub <[email protected]>2021-01-26 11:52:03 +0300
commite2266b80db47444ba5858c736833a8a81b1361ad (patch)
tree37e06810352752f88032f7d0eadb554fa18b98da /tests/plugins/temporal/disaster_test.go
parentfae4711e3548bfd2e34f13aabfaab6a5b4e317c6 (diff)
parenta392d962508e1bc9e497c8c4ef021425bc2c67c2 (diff)
Merge pull request #502 from spiral/plugin/temporalv2.0.0-beta12
plugin(temporal): Add temporal plugins set to the RR2
Diffstat (limited to 'tests/plugins/temporal/disaster_test.go')
-rw-r--r--tests/plugins/temporal/disaster_test.go114
1 files changed, 114 insertions, 0 deletions
diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go
new file mode 100644
index 00000000..9ca4d018
--- /dev/null
+++ b/tests/plugins/temporal/disaster_test.go
@@ -0,0 +1,114 @@
+package tests
+
+import (
+ "context"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_WorkerError_DisasterRecovery(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
+ assert.NoError(t, err)
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 750)
+
+ // must fully recover with new worker
+ assert.NoError(t, p.Kill())
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+}
+
+func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ defer func() {
+ // always restore script
+ _ = os.Rename("worker.bak", "worker.php")
+ }()
+
+ // Makes worker pool unable to recover for some time
+ _ = os.Rename("worker.php", "worker.bak")
+
+ p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
+ assert.NoError(t, err)
+
+ // must fully recover with new worker
+ assert.NoError(t, p.Kill())
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 750)
+
+ // restore the script and recover activity pool
+ _ = os.Rename("worker.bak", "worker.php")
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+}
+
+func Test_ActivityError_DisasterRecovery(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ defer func() {
+ // always restore script
+ _ = os.Rename("worker.bak", "worker.php")
+ }()
+
+ // Makes worker pool unable to recover for some time
+ _ = os.Rename("worker.php", "worker.bak")
+
+ // destroys all workers in activities
+ for _, wrk := range s.activities.Workers() {
+ assert.NoError(t, wrk.Kill())
+ }
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ // activity can't complete at this moment
+ time.Sleep(time.Millisecond * 750)
+
+ // restore the script and recover activity pool
+ _ = os.Rename("worker.bak", "worker.php")
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "HELLO WORLD", result)
+}