diff options
author | Valery Piashchynski <[email protected]> | 2021-01-27 13:56:28 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-27 13:56:28 +0300 |
commit | 744c2b0c86b88f77e681f8660bf3a476e83711b8 (patch) | |
tree | f7af7d7d494d1f5ca272af1ad0b978fe44d685a9 /tests/plugins | |
parent | e2266b80db47444ba5858c736833a8a81b1361ad (diff) |
Move temporal plugin to the temporal repository
Diffstat (limited to 'tests/plugins')
-rw-r--r-- | tests/plugins/temporal/.rr.yaml | 22 | ||||
-rw-r--r-- | tests/plugins/temporal/cancel_test.go | 291 | ||||
-rw-r--r-- | tests/plugins/temporal/child_test.go | 84 | ||||
-rw-r--r-- | tests/plugins/temporal/disaster_test.go | 114 | ||||
-rw-r--r-- | tests/plugins/temporal/hp_test.go | 408 | ||||
-rw-r--r-- | tests/plugins/temporal/query_test.go | 66 | ||||
-rw-r--r-- | tests/plugins/temporal/server_test.go | 198 | ||||
-rw-r--r-- | tests/plugins/temporal/signal_test.go | 170 | ||||
-rw-r--r-- | tests/plugins/temporal/worker.php | 33 |
9 files changed, 0 insertions, 1386 deletions
diff --git a/tests/plugins/temporal/.rr.yaml b/tests/plugins/temporal/.rr.yaml deleted file mode 100644 index 04d0730d..00000000 --- a/tests/plugins/temporal/.rr.yaml +++ /dev/null @@ -1,22 +0,0 @@ -# Application configuration -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php worker.php" - -# Workflow and activity mesh service -temporal: - address: "localhost:7233" - activities: - num_workers: 4 - codec: json - debug_level: 2 - -logs: - mode: none - channels: - activities: - mode: none - #workflows: - #mode: none
\ No newline at end of file diff --git a/tests/plugins/temporal/cancel_test.go b/tests/plugins/temporal/cancel_test.go deleted file mode 100644 index 0fd3c126..00000000 --- a/tests/plugins/temporal/cancel_test.go +++ /dev/null @@ -1,291 +0,0 @@ -package tests - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.temporal.io/api/enums/v1" - "go.temporal.io/api/history/v1" - "go.temporal.io/sdk/client" -) - -func Test_SimpleWorkflowCancel(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleSignalledWorkflow") - assert.NoError(t, err) - - time.Sleep(time.Millisecond * 500) - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.Error(t, w.Get(context.Background(), &result)) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - assert.Equal(t, "Canceled", we.WorkflowExecutionInfo.Status.String()) -} - -func Test_CancellableWorkflowScope(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledScopeWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "yes", result) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_TIMER_CANCELED - }) - - s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED - }) -} - -func Test_CancelledWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "CANCELLED", result) -} - -func Test_CancelledWithCompensationWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledWithCompensationWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "yield", - "rollback", - "captured retry", - "captured promise on cancelled", - "START rollback", - "WAIT ROLLBACK", - "RESULT (ROLLBACK)", "DONE rollback", - "COMPLETE rollback", - "result: OK", - }, - trace, - ) -} - -func Test_CancelledNestedWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledNestedWorkflow", - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "CANCELLED", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "begin", - "first scope", - "second scope", - "close second scope", - "close first scope", - "second scope cancelled", - "first scope cancelled", - "close process", - }, - trace, - ) -} - -func Test_CancelledNSingleScopeWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledSingleScopeWorkflow", - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "start", - "in scope", - "on cancel", - "captured in scope", - "captured in process", - }, - trace, - ) -} - -func Test_CancelledMidflightWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledMidflightWorkflow", - ) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "start", - "in scope", - "on cancel", - "done cancel", - }, - trace, - ) - - s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED - }) -} - -func Test_CancelSignalledChildWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelSignalledChildWorkflow", - ) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "cancelled ok", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "start", - "child started", - "child signalled", - "scope cancelled", - "process done", - }, - trace, - ) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED - }) -} diff --git a/tests/plugins/temporal/child_test.go b/tests/plugins/temporal/child_test.go deleted file mode 100644 index 49521791..00000000 --- a/tests/plugins/temporal/child_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package tests - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "go.temporal.io/sdk/client" -) - -func Test_ExecuteChildWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "WithChildWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "Child: CHILD HELLO WORLD", result) -} - -func Test_ExecuteChildStubWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "WithChildStubWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "Child: CHILD HELLO WORLD", result) -} - -func Test_ExecuteChildStubWorkflow_02(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ChildStubWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result []string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, []string{"HELLO WORLD", "UNTYPED"}, result) -} - -func Test_SignalChildViaStubWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SignalChildViaStubWorkflow", - ) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, 8, result) -} diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go deleted file mode 100644 index 9ca4d018..00000000 --- a/tests/plugins/temporal/disaster_test.go +++ /dev/null @@ -1,114 +0,0 @@ -package tests - -import ( - "context" - "os" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.temporal.io/sdk/client" -) - -func Test_WorkerError_DisasterRecovery(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) - assert.NoError(t, err) - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "TimerWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Millisecond * 750) - - // must fully recover with new worker - assert.NoError(t, p.Kill()) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "hello world", result) -} - -func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - defer func() { - // always restore script - _ = os.Rename("worker.bak", "worker.php") - }() - - // Makes worker pool unable to recover for some time - _ = os.Rename("worker.php", "worker.bak") - - p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) - assert.NoError(t, err) - - // must fully recover with new worker - assert.NoError(t, p.Kill()) - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "TimerWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Millisecond * 750) - - // restore the script and recover activity pool - _ = os.Rename("worker.bak", "worker.php") - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "hello world", result) -} - -func Test_ActivityError_DisasterRecovery(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - defer func() { - // always restore script - _ = os.Rename("worker.bak", "worker.php") - }() - - // Makes worker pool unable to recover for some time - _ = os.Rename("worker.php", "worker.bak") - - // destroys all workers in activities - for _, wrk := range s.activities.Workers() { - assert.NoError(t, wrk.Kill()) - } - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - // activity can't complete at this moment - time.Sleep(time.Millisecond * 750) - - // restore the script and recover activity pool - _ = os.Rename("worker.bak", "worker.php") - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "HELLO WORLD", result) -} diff --git a/tests/plugins/temporal/hp_test.go b/tests/plugins/temporal/hp_test.go deleted file mode 100644 index bceac025..00000000 --- a/tests/plugins/temporal/hp_test.go +++ /dev/null @@ -1,408 +0,0 @@ -package tests - -import ( - "context" - "crypto/rand" - "crypto/sha512" - "fmt" - "testing" - "time" - - "go.temporal.io/api/common/v1" - - "github.com/fatih/color" - "github.com/stretchr/testify/assert" - "go.temporal.io/api/enums/v1" - "go.temporal.io/api/history/v1" - "go.temporal.io/sdk/client" -) - -func init() { - color.NoColor = false -} - -func Test_VerifyRegistration(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - assert.Contains(t, s.workflows.WorkflowNames(), "SimpleWorkflow") - - assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.echo") - assert.Contains(t, s.activities.ActivityNames(), "HeartBeatActivity.doSomething") - - assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.lower") -} - -func Test_ExecuteSimpleWorkflow_1(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "HELLO WORLD", result) -} - -type User struct { - Name string - Email string -} - -func Test_ExecuteSimpleDTOWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleDTOWorkflow", - User{ - Name: "Antony", - Email: "[email protected]", - }, - ) - assert.NoError(t, err) - - var result struct{ Message string } - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "Hello Antony <[email protected]>", result.Message) -} - -func Test_ExecuteSimpleWorkflowWithSequenceInBatch(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "WorkflowWithSequence", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) -} - -func Test_MultipleWorkflowsInSingleWorker(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - w2, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "TimerWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "HELLO WORLD", result) - - assert.NoError(t, w2.Get(context.Background(), &result)) - assert.Equal(t, "hello world", result) -} - -func Test_ExecuteWorkflowWithParallelScopes(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ParallelScopesWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "HELLO WORLD|Hello World|hello world", result) -} - -func Test_Timer(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - start := time.Now() - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "TimerWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "hello world", result) - assert.True(t, time.Since(start).Seconds() > 1) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_TIMER_STARTED - }) -} - -func Test_SideEffect(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SideEffectWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Contains(t, result, "hello world-") - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_MARKER_RECORDED - }) -} - -func Test_EmptyWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "EmptyWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, 42, result) -} - -func Test_PromiseChaining(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ChainedWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "result:hello world", result) -} - -func Test_ActivityHeartbeat(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleHeartbeatWorkflow", - 2, - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - assert.Len(t, we.PendingActivities, 1) - - act := we.PendingActivities[0] - - assert.Equal(t, `{"value":2}`, string(act.HeartbeatDetails.Payloads[0].Data)) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) -} - -func Test_FailedActivityHeartbeat(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "FailedHeartbeatWorkflow", - 8, - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - assert.Len(t, we.PendingActivities, 1) - - act := we.PendingActivities[0] - - assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data)) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK!", result) -} - -func Test_BinaryPayload(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - rnd := make([]byte, 2500) - - _, err := rand.Read(rnd) - assert.NoError(t, err) - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "BinaryWorkflow", - rnd, - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - - assert.Equal(t, fmt.Sprintf("%x", sha512.Sum512(rnd)), result) -} - -func Test_ContinueAsNew(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ContinuableWorkflow", - 1, - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - assert.Equal(t, "ContinuedAsNew", we.WorkflowExecutionInfo.Status.String()) - - time.Sleep(time.Second) - - // the result of the final workflow - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK6", result) -} - -func Test_ActivityStubWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ActivityStubWorkflow", - "hello world", - ) - assert.NoError(t, err) - - // the result of the final workflow - var result []string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, []string{ - "HELLO WORLD", - "invalid method call", - "UNTYPED", - }, result) -} - -func Test_ExecuteProtoWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ProtoPayloadWorkflow", - ) - assert.NoError(t, err) - - var result common.WorkflowExecution - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "updated", result.RunId) - assert.Equal(t, "workflow id", result.WorkflowId) -} - -func Test_SagaWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SagaWorkflow", - ) - assert.NoError(t, err) - - var result string - assert.Error(t, w.Get(context.Background(), &result)) -} diff --git a/tests/plugins/temporal/query_test.go b/tests/plugins/temporal/query_test.go deleted file mode 100644 index 8b0caeee..00000000 --- a/tests/plugins/temporal/query_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package tests - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.temporal.io/sdk/client" -) - -func Test_ListQueries(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "QueryWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Millisecond * 500) - - v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "error", -1) - assert.Nil(t, v) - assert.Error(t, err) - - assert.Contains(t, err.Error(), "KnownQueryTypes=[get]") - - var r int - assert.NoError(t, w.Get(context.Background(), &r)) - assert.Equal(t, 0, r) -} - -func Test_GetQuery(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "QueryWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", 88) - assert.NoError(t, err) - time.Sleep(time.Millisecond * 500) - - v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "get", nil) - assert.NoError(t, err) - - var r int - assert.NoError(t, v.Get(&r)) - assert.Equal(t, 88, r) - - assert.NoError(t, w.Get(context.Background(), &r)) - assert.Equal(t, 88, r) -} diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go deleted file mode 100644 index c8d815c3..00000000 --- a/tests/plugins/temporal/server_test.go +++ /dev/null @@ -1,198 +0,0 @@ -package tests - -import ( - "context" - "testing" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/resetter" - "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/activity" - rrClient "github.com/spiral/roadrunner/v2/plugins/temporal/client" - "github.com/spiral/roadrunner/v2/plugins/temporal/workflow" - "go.temporal.io/api/enums/v1" - "go.temporal.io/api/history/v1" - temporalClient "go.temporal.io/sdk/client" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -type TestServer struct { - container endure.Container - temporal rrClient.Temporal - activities *activity.Plugin - workflows *workflow.Plugin -} - -type ConfigOption struct { - Name string - Value interface{} -} - -func NewOption(name string, value interface{}) ConfigOption { - return ConfigOption{Name: name, Value: value} -} - -func NewTestServer(opt ...ConfigOption) *TestServer { - e, err := endure.NewContainer(initLogger(), endure.RetryOnFail(false)) - if err != nil { - panic(err) - } - - t := &rrClient.Plugin{} - a := &activity.Plugin{} - w := &workflow.Plugin{} - - if err := e.Register(initConfig(opt...)); err != nil { - panic(err) - } - - if err := e.Register(&logger.ZapLogger{}); err != nil { - panic(err) - } - if err := e.Register(&resetter.Plugin{}); err != nil { - panic(err) - } - if err := e.Register(&informer.Plugin{}); err != nil { - panic(err) - } - if err := e.Register(&server.Plugin{}); err != nil { - panic(err) - } - if err := e.Register(&rpc.Plugin{}); err != nil { - panic(err) - } - - if err := e.Register(t); err != nil { - panic(err) - } - if err := e.Register(a); err != nil { - panic(err) - } - if err := e.Register(w); err != nil { - panic(err) - } - - if err := e.Init(); err != nil { - panic(err) - } - - errCh, err := e.Serve() - if err != nil { - panic(err) - } - - go func() { - err := <-errCh - er := e.Stop() - if er != nil { - panic(err) - } - }() - - return &TestServer{container: e, temporal: t, activities: a, workflows: w} -} - -func (s *TestServer) Client() temporalClient.Client { - return s.temporal.GetClient() -} - -func (s *TestServer) MustClose() { - err := s.container.Stop() - if err != nil { - panic(err) - } -} - -func initConfig(opt ...ConfigOption) config.Configurer { - cfg := &config.Viper{} - cfg.Path = ".rr.yaml" - cfg.Prefix = "rr" - - return cfg -} - -func initLogger() *zap.Logger { - cfg := zap.Config{ - Level: zap.NewAtomicLevelAt(zap.ErrorLevel), - Encoding: "console", - EncoderConfig: zapcore.EncoderConfig{ - MessageKey: "message", - LevelKey: "level", - TimeKey: "time", - CallerKey: "caller", - NameKey: "name", - StacktraceKey: "stack", - EncodeLevel: zapcore.CapitalLevelEncoder, - EncodeTime: zapcore.ISO8601TimeEncoder, - EncodeCaller: zapcore.ShortCallerEncoder, - }, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, - } - - l, err := cfg.Build(zap.AddCaller()) - if err != nil { - panic(err) - } - - return l -} - -func (s *TestServer) AssertContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { - i := s.Client().GetWorkflowHistory( - context.Background(), - w.GetID(), - w.GetRunID(), - false, - enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, - ) - - for { - if !i.HasNext() { - t.Error("no more events and no match found") - break - } - - e, err := i.Next() - if err != nil { - t.Error("unable to read history event") - break - } - - if assert(e) { - break - } - } -} - -func (s *TestServer) AssertNotContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { - i := s.Client().GetWorkflowHistory( - context.Background(), - w.GetID(), - w.GetRunID(), - false, - enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, - ) - - for { - if !i.HasNext() { - break - } - - e, err := i.Next() - if err != nil { - t.Error("unable to read history event") - break - } - - if assert(e) { - t.Error("found unexpected event") - break - } - } -} diff --git a/tests/plugins/temporal/signal_test.go b/tests/plugins/temporal/signal_test.go deleted file mode 100644 index 51826287..00000000 --- a/tests/plugins/temporal/signal_test.go +++ /dev/null @@ -1,170 +0,0 @@ -package tests - -import ( - "context" - "testing" - "time" - - "github.com/pborman/uuid" - "github.com/stretchr/testify/assert" - "go.temporal.io/api/enums/v1" - "go.temporal.io/api/history/v1" - "go.temporal.io/sdk/client" -) - -func Test_SignalsWithoutSignals(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleSignalledWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, 0, result) -} - -func Test_SendSignalDuringTimer(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().SignalWithStartWorkflow( - context.Background(), - "signalled-"+uuid.New(), - "add", - 10, - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleSignalledWorkflow", - ) - assert.NoError(t, err) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, 9, result) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { - attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) - return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" - } - - return false - }) -} - -func Test_SendSignalBeforeCompletingWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleSignalledWorkflowWithSleep", - ) - assert.NoError(t, err) - - // should be around sleep(1) call - time.Sleep(time.Second + time.Millisecond*200) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, -1, result) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { - attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) - return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" - } - - return false - }) -} - -func Test_RuntimeSignal(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().SignalWithStartWorkflow( - context.Background(), - "signalled-"+uuid.New(), - "add", - -1, - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "RuntimeSignalWorkflow", - ) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, -1, result) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { - attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) - return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" - } - - return false - }) -} - -func Test_SignalSteps(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "WorkflowWithSignalledSteps", - ) - assert.NoError(t, err) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "begin", true) - assert.NoError(t, err) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next1", true) - assert.NoError(t, err) - - v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) - assert.NoError(t, err) - - var r int - assert.NoError(t, v.Get(&r)) - assert.Equal(t, 2, r) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next2", true) - assert.NoError(t, err) - - v, err = s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) - assert.NoError(t, err) - - assert.NoError(t, v.Get(&r)) - assert.Equal(t, 3, r) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - - // 3 ticks - assert.Equal(t, 3, result) -} diff --git a/tests/plugins/temporal/worker.php b/tests/plugins/temporal/worker.php deleted file mode 100644 index 0d0263e7..00000000 --- a/tests/plugins/temporal/worker.php +++ /dev/null @@ -1,33 +0,0 @@ -<?php - -declare(strict_types=1); - -require __DIR__ . '/../../vendor/autoload.php'; - -/** - * @param string $dir - * @return array<string> - */ -$getClasses = static function (string $dir): iterable { - $files = glob($dir . '/*.php'); - - foreach ($files as $file) { - yield substr(basename($file), 0, -4); - } -}; - -$factory = \Temporal\WorkerFactory::create(); - -$worker = $factory->newWorker('default'); - -// register all workflows -foreach ($getClasses(__DIR__ . '/../../temporal/Workflow') as $name) { - $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name); -} - -// register all activity -foreach ($getClasses(__DIR__ . '/../../temporal/Activity') as $name) { - $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name); -} - -$factory->run(); |