diff options
author | Valery Piashchynski <[email protected]> | 2021-01-25 22:47:02 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-25 22:47:02 +0300 |
commit | 43071e43a0743ff8c7913bba7819952962124355 (patch) | |
tree | e3b61113d3c0d28f972c71592af8b2f708994167 /tests/plugins | |
parent | 5fd1168c687040ca7d72f4727ee1aec753d3f258 (diff) |
Initial commit of the Temporal plugins set
Diffstat (limited to 'tests/plugins')
-rw-r--r-- | tests/plugins/http/handler_test.go | 8 | ||||
-rw-r--r-- | tests/plugins/temporal/.rr.yaml | 22 | ||||
-rw-r--r-- | tests/plugins/temporal/cancel_test.go | 291 | ||||
-rw-r--r-- | tests/plugins/temporal/child_test.go | 84 | ||||
-rw-r--r-- | tests/plugins/temporal/disaster_test.go | 114 | ||||
-rw-r--r-- | tests/plugins/temporal/hp_test.go | 408 | ||||
-rw-r--r-- | tests/plugins/temporal/query_test.go | 66 | ||||
-rw-r--r-- | tests/plugins/temporal/server_test.go | 203 | ||||
-rw-r--r-- | tests/plugins/temporal/signal_test.go | 170 | ||||
-rw-r--r-- | tests/plugins/temporal/worker.php | 33 |
10 files changed, 1395 insertions, 4 deletions
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index e47dbd44..fc672e36 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -156,7 +156,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { }, nil, p) assert.NoError(t, err) - hs := &http.Server{Addr: ":8088", Handler: h} + hs := &http.Server{Addr: ":19658", Handler: h} defer func() { err := hs.Shutdown(context.Background()) if err != nil { @@ -172,7 +172,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) + req, err := http.NewRequest("GET", "http://localhost:19658?hello=world", nil) assert.NoError(t, err) req.Header.Add("user-agent", "") @@ -216,7 +216,7 @@ func TestHandler_User_Agent(t *testing.T) { }, nil, p) assert.NoError(t, err) - hs := &http.Server{Addr: ":8088", Handler: h} + hs := &http.Server{Addr: ":25688", Handler: h} defer func() { err := hs.Shutdown(context.Background()) if err != nil { @@ -232,7 +232,7 @@ func TestHandler_User_Agent(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) + req, err := http.NewRequest("GET", "http://localhost:25688?hello=world", nil) assert.NoError(t, err) req.Header.Add("User-Agent", "go-agent") diff --git a/tests/plugins/temporal/.rr.yaml b/tests/plugins/temporal/.rr.yaml new file mode 100644 index 00000000..04d0730d --- /dev/null +++ b/tests/plugins/temporal/.rr.yaml @@ -0,0 +1,22 @@ +# Application configuration +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php worker.php" + +# Workflow and activity mesh service +temporal: + address: "localhost:7233" + activities: + num_workers: 4 + codec: json + debug_level: 2 + +logs: + mode: none + channels: + activities: + mode: none + #workflows: + #mode: none
\ No newline at end of file diff --git a/tests/plugins/temporal/cancel_test.go b/tests/plugins/temporal/cancel_test.go new file mode 100644 index 00000000..0fd3c126 --- /dev/null +++ b/tests/plugins/temporal/cancel_test.go @@ -0,0 +1,291 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func Test_SimpleWorkflowCancel(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflow") + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 500) + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.Error(t, w.Get(context.Background(), &result)) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + assert.Equal(t, "Canceled", we.WorkflowExecutionInfo.Status.String()) +} + +func Test_CancellableWorkflowScope(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledScopeWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "yes", result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_TIMER_CANCELED + }) + + s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED + }) +} + +func Test_CancelledWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "CANCELLED", result) +} + +func Test_CancelledWithCompensationWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledWithCompensationWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "yield", + "rollback", + "captured retry", + "captured promise on cancelled", + "START rollback", + "WAIT ROLLBACK", + "RESULT (ROLLBACK)", "DONE rollback", + "COMPLETE rollback", + "result: OK", + }, + trace, + ) +} + +func Test_CancelledNestedWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledNestedWorkflow", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "CANCELLED", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "begin", + "first scope", + "second scope", + "close second scope", + "close first scope", + "second scope cancelled", + "first scope cancelled", + "close process", + }, + trace, + ) +} + +func Test_CancelledNSingleScopeWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledSingleScopeWorkflow", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "start", + "in scope", + "on cancel", + "captured in scope", + "captured in process", + }, + trace, + ) +} + +func Test_CancelledMidflightWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledMidflightWorkflow", + ) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "start", + "in scope", + "on cancel", + "done cancel", + }, + trace, + ) + + s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED + }) +} + +func Test_CancelSignalledChildWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelSignalledChildWorkflow", + ) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "cancelled ok", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "start", + "child started", + "child signalled", + "scope cancelled", + "process done", + }, + trace, + ) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED + }) +} diff --git a/tests/plugins/temporal/child_test.go b/tests/plugins/temporal/child_test.go new file mode 100644 index 00000000..49521791 --- /dev/null +++ b/tests/plugins/temporal/child_test.go @@ -0,0 +1,84 @@ +package tests + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/client" +) + +func Test_ExecuteChildWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WithChildWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "Child: CHILD HELLO WORLD", result) +} + +func Test_ExecuteChildStubWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WithChildStubWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "Child: CHILD HELLO WORLD", result) +} + +func Test_ExecuteChildStubWorkflow_02(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ChildStubWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result []string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, []string{"HELLO WORLD", "UNTYPED"}, result) +} + +func Test_SignalChildViaStubWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SignalChildViaStubWorkflow", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 8, result) +} diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go new file mode 100644 index 00000000..9ca4d018 --- /dev/null +++ b/tests/plugins/temporal/disaster_test.go @@ -0,0 +1,114 @@ +package tests + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/client" +) + +func Test_WorkerError_DisasterRecovery(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) + assert.NoError(t, err) + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 750) + + // must fully recover with new worker + assert.NoError(t, p.Kill()) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) +} + +func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + defer func() { + // always restore script + _ = os.Rename("worker.bak", "worker.php") + }() + + // Makes worker pool unable to recover for some time + _ = os.Rename("worker.php", "worker.bak") + + p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) + assert.NoError(t, err) + + // must fully recover with new worker + assert.NoError(t, p.Kill()) + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 750) + + // restore the script and recover activity pool + _ = os.Rename("worker.bak", "worker.php") + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) +} + +func Test_ActivityError_DisasterRecovery(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + defer func() { + // always restore script + _ = os.Rename("worker.bak", "worker.php") + }() + + // Makes worker pool unable to recover for some time + _ = os.Rename("worker.php", "worker.bak") + + // destroys all workers in activities + for _, wrk := range s.activities.Workers() { + assert.NoError(t, wrk.Kill()) + } + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + // activity can't complete at this moment + time.Sleep(time.Millisecond * 750) + + // restore the script and recover activity pool + _ = os.Rename("worker.bak", "worker.php") + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD", result) +} diff --git a/tests/plugins/temporal/hp_test.go b/tests/plugins/temporal/hp_test.go new file mode 100644 index 00000000..bceac025 --- /dev/null +++ b/tests/plugins/temporal/hp_test.go @@ -0,0 +1,408 @@ +package tests + +import ( + "context" + "crypto/rand" + "crypto/sha512" + "fmt" + "testing" + "time" + + "go.temporal.io/api/common/v1" + + "github.com/fatih/color" + "github.com/stretchr/testify/assert" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func init() { + color.NoColor = false +} + +func Test_VerifyRegistration(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + assert.Contains(t, s.workflows.WorkflowNames(), "SimpleWorkflow") + + assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.echo") + assert.Contains(t, s.activities.ActivityNames(), "HeartBeatActivity.doSomething") + + assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.lower") +} + +func Test_ExecuteSimpleWorkflow_1(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD", result) +} + +type User struct { + Name string + Email string +} + +func Test_ExecuteSimpleDTOWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleDTOWorkflow", + User{ + Name: "Antony", + Email: "[email protected]", + }, + ) + assert.NoError(t, err) + + var result struct{ Message string } + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "Hello Antony <[email protected]>", result.Message) +} + +func Test_ExecuteSimpleWorkflowWithSequenceInBatch(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WorkflowWithSequence", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) +} + +func Test_MultipleWorkflowsInSingleWorker(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + w2, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD", result) + + assert.NoError(t, w2.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) +} + +func Test_ExecuteWorkflowWithParallelScopes(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ParallelScopesWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD|Hello World|hello world", result) +} + +func Test_Timer(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + start := time.Now() + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) + assert.True(t, time.Since(start).Seconds() > 1) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_TIMER_STARTED + }) +} + +func Test_SideEffect(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SideEffectWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Contains(t, result, "hello world-") + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_MARKER_RECORDED + }) +} + +func Test_EmptyWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "EmptyWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 42, result) +} + +func Test_PromiseChaining(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ChainedWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "result:hello world", result) +} + +func Test_ActivityHeartbeat(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleHeartbeatWorkflow", + 2, + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + assert.Len(t, we.PendingActivities, 1) + + act := we.PendingActivities[0] + + assert.Equal(t, `{"value":2}`, string(act.HeartbeatDetails.Payloads[0].Data)) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) +} + +func Test_FailedActivityHeartbeat(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "FailedHeartbeatWorkflow", + 8, + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + assert.Len(t, we.PendingActivities, 1) + + act := we.PendingActivities[0] + + assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data)) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK!", result) +} + +func Test_BinaryPayload(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + rnd := make([]byte, 2500) + + _, err := rand.Read(rnd) + assert.NoError(t, err) + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "BinaryWorkflow", + rnd, + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + + assert.Equal(t, fmt.Sprintf("%x", sha512.Sum512(rnd)), result) +} + +func Test_ContinueAsNew(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ContinuableWorkflow", + 1, + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + assert.Equal(t, "ContinuedAsNew", we.WorkflowExecutionInfo.Status.String()) + + time.Sleep(time.Second) + + // the result of the final workflow + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK6", result) +} + +func Test_ActivityStubWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ActivityStubWorkflow", + "hello world", + ) + assert.NoError(t, err) + + // the result of the final workflow + var result []string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, []string{ + "HELLO WORLD", + "invalid method call", + "UNTYPED", + }, result) +} + +func Test_ExecuteProtoWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ProtoPayloadWorkflow", + ) + assert.NoError(t, err) + + var result common.WorkflowExecution + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "updated", result.RunId) + assert.Equal(t, "workflow id", result.WorkflowId) +} + +func Test_SagaWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SagaWorkflow", + ) + assert.NoError(t, err) + + var result string + assert.Error(t, w.Get(context.Background(), &result)) +} diff --git a/tests/plugins/temporal/query_test.go b/tests/plugins/temporal/query_test.go new file mode 100644 index 00000000..8b0caeee --- /dev/null +++ b/tests/plugins/temporal/query_test.go @@ -0,0 +1,66 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/client" +) + +func Test_ListQueries(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "QueryWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 500) + + v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "error", -1) + assert.Nil(t, v) + assert.Error(t, err) + + assert.Contains(t, err.Error(), "KnownQueryTypes=[get]") + + var r int + assert.NoError(t, w.Get(context.Background(), &r)) + assert.Equal(t, 0, r) +} + +func Test_GetQuery(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "QueryWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", 88) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 500) + + v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "get", nil) + assert.NoError(t, err) + + var r int + assert.NoError(t, v.Get(&r)) + assert.Equal(t, 88, r) + + assert.NoError(t, w.Get(context.Background(), &r)) + assert.Equal(t, 88, r) +} diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go new file mode 100644 index 00000000..03e4ed7f --- /dev/null +++ b/tests/plugins/temporal/server_test.go @@ -0,0 +1,203 @@ +package tests + +import ( + "context" + "testing" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/activity" + rrClient "github.com/spiral/roadrunner/v2/plugins/temporal/client" + "github.com/spiral/roadrunner/v2/plugins/temporal/workflow" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + temporalClient "go.temporal.io/sdk/client" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type TestServer struct { + container endure.Container + temporal rrClient.Temporal + activities *activity.Plugin + workflows *workflow.Plugin +} + +type ConfigOption struct { + Name string + Value interface{} +} + +func NewOption(name string, value interface{}) ConfigOption { + return ConfigOption{Name: name, Value: value} +} + +func NewTestServer(opt ...ConfigOption) *TestServer { + e, err := endure.NewContainer(initLogger(), endure.RetryOnFail(false)) + if err != nil { + panic(err) + } + + t := &rrClient.Plugin{} + a := &activity.Plugin{} + w := &workflow.Plugin{} + + if err := e.Register(initConfig(opt...)); err != nil { + panic(err) + } + + if err := e.Register(&logger.ZapLogger{}); err != nil { + panic(err) + } + if err := e.Register(&resetter.Plugin{}); err != nil { + panic(err) + } + if err := e.Register(&informer.Plugin{}); err != nil { + panic(err) + } + if err := e.Register(&server.Plugin{}); err != nil { + panic(err) + } + if err := e.Register(&rpc.Plugin{}); err != nil { + panic(err) + } + + if err := e.Register(t); err != nil { + panic(err) + } + if err := e.Register(a); err != nil { + panic(err) + } + if err := e.Register(w); err != nil { + panic(err) + } + + if err := e.Init(); err != nil { + panic(err) + } + + errCh, err := e.Serve() + if err != nil { + panic(err) + } + + go func() { + err := <-errCh + er := e.Stop() + if er != nil { + panic(err) + } + }() + + return &TestServer{container: e, temporal: t, activities: a, workflows: w} +} + +func (s *TestServer) Client() temporalClient.Client { + c, err := s.temporal.GetClient() + if err != nil { + panic(err) + } + + return c +} + +func (s *TestServer) MustClose() { + err := s.container.Stop() + if err != nil { + panic(err) + } +} + +func initConfig(opt ...ConfigOption) config.Configurer { + cfg := &config.Viper{} + cfg.Path = ".rr.yaml" + cfg.Prefix = "rr" + + return cfg +} + +func initLogger() *zap.Logger { + cfg := zap.Config{ + Level: zap.NewAtomicLevelAt(zap.ErrorLevel), + Encoding: "console", + EncoderConfig: zapcore.EncoderConfig{ + MessageKey: "message", + LevelKey: "level", + TimeKey: "time", + CallerKey: "caller", + NameKey: "name", + StacktraceKey: "stack", + EncodeLevel: zapcore.CapitalLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + + l, err := cfg.Build(zap.AddCaller()) + if err != nil { + panic(err) + } + + return l +} + +func (s *TestServer) AssertContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { + i := s.Client().GetWorkflowHistory( + context.Background(), + w.GetID(), + w.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + + for { + if !i.HasNext() { + t.Error("no more events and no match found") + break + } + + e, err := i.Next() + if err != nil { + t.Error("unable to read history event") + break + } + + if assert(e) { + break + } + } +} + +func (s *TestServer) AssertNotContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { + i := s.Client().GetWorkflowHistory( + context.Background(), + w.GetID(), + w.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + + for { + if !i.HasNext() { + break + } + + e, err := i.Next() + if err != nil { + t.Error("unable to read history event") + break + } + + if assert(e) { + t.Error("found unexpected event") + break + } + } +} diff --git a/tests/plugins/temporal/signal_test.go b/tests/plugins/temporal/signal_test.go new file mode 100644 index 00000000..51826287 --- /dev/null +++ b/tests/plugins/temporal/signal_test.go @@ -0,0 +1,170 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/assert" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func Test_SignalsWithoutSignals(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 0, result) +} + +func Test_SendSignalDuringTimer(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().SignalWithStartWorkflow( + context.Background(), + "signalled-"+uuid.New(), + "add", + 10, + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflow", + ) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 9, result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) + return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" + } + + return false + }) +} + +func Test_SendSignalBeforeCompletingWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflowWithSleep", + ) + assert.NoError(t, err) + + // should be around sleep(1) call + time.Sleep(time.Second + time.Millisecond*200) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, -1, result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) + return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" + } + + return false + }) +} + +func Test_RuntimeSignal(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().SignalWithStartWorkflow( + context.Background(), + "signalled-"+uuid.New(), + "add", + -1, + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "RuntimeSignalWorkflow", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, -1, result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) + return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" + } + + return false + }) +} + +func Test_SignalSteps(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WorkflowWithSignalledSteps", + ) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "begin", true) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next1", true) + assert.NoError(t, err) + + v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) + assert.NoError(t, err) + + var r int + assert.NoError(t, v.Get(&r)) + assert.Equal(t, 2, r) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next2", true) + assert.NoError(t, err) + + v, err = s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) + assert.NoError(t, err) + + assert.NoError(t, v.Get(&r)) + assert.Equal(t, 3, r) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + + // 3 ticks + assert.Equal(t, 3, result) +} diff --git a/tests/plugins/temporal/worker.php b/tests/plugins/temporal/worker.php new file mode 100644 index 00000000..0d0263e7 --- /dev/null +++ b/tests/plugins/temporal/worker.php @@ -0,0 +1,33 @@ +<?php + +declare(strict_types=1); + +require __DIR__ . '/../../vendor/autoload.php'; + +/** + * @param string $dir + * @return array<string> + */ +$getClasses = static function (string $dir): iterable { + $files = glob($dir . '/*.php'); + + foreach ($files as $file) { + yield substr(basename($file), 0, -4); + } +}; + +$factory = \Temporal\WorkerFactory::create(); + +$worker = $factory->newWorker('default'); + +// register all workflows +foreach ($getClasses(__DIR__ . '/../../temporal/Workflow') as $name) { + $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name); +} + +// register all activity +foreach ($getClasses(__DIR__ . '/../../temporal/Activity') as $name) { + $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name); +} + +$factory->run(); |