diff options
author | Valery Piashchynski <[email protected]> | 2021-01-26 11:52:03 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-01-26 11:52:03 +0300 |
commit | e2266b80db47444ba5858c736833a8a81b1361ad (patch) | |
tree | 37e06810352752f88032f7d0eadb554fa18b98da /tests | |
parent | fae4711e3548bfd2e34f13aabfaab6a5b4e317c6 (diff) | |
parent | a392d962508e1bc9e497c8c4ef021425bc2c67c2 (diff) |
Merge pull request #502 from spiral/plugin/temporalv2.0.0-beta12
plugin(temporal): Add temporal plugins set to the RR2
Diffstat (limited to 'tests')
59 files changed, 3105 insertions, 22 deletions
diff --git a/tests/composer.json b/tests/composer.json index 889b6808..060f105e 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -1,14 +1,15 @@ { - "minimum-stability": "beta", + "minimum-stability": "dev", "prefer-stable": true, "require": { "nyholm/psr7": "^1.3", "spiral/roadrunner": "^2.0", - "spiral/roadrunner-http": "^2.0" + "spiral/roadrunner-http": "^2.0", + "temporal/sdk": "dev-master" }, "autoload": { "psr-4": { - "Spiral\\RoadRunner\\": "src/" + "Temporal\\Tests\\": "temporal" } } } diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index fd1a48bf..fa1070e1 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -4,4 +4,36 @@ services: memcached: image: memcached:latest ports: - - "0.0.0.0:11211:11211"
\ No newline at end of file + - "0.0.0.0:11211:11211" + cassandra: + image: cassandra:3.11 + ports: + - "9042:9042" + temporal: + image: temporalio/auto-setup:${SERVER_TAG:-1.1.0} + ports: + - "7233:7233" + volumes: + - ${DYNAMIC_CONFIG_DIR:-../config/dynamicconfig}:/etc/temporal/config/dynamicconfig + environment: + - "CASSANDRA_SEEDS=cassandra" + - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml" + depends_on: + - cassandra + temporal-admin-tools: + image: temporalio/admin-tools:${SERVER_TAG:-1.1.0} + stdin_open: true + tty: true + environment: + - "TEMPORAL_CLI_ADDRESS=temporal:7233" + depends_on: + - temporal + temporal-web: + image: temporalio/web:${WEB_TAG:-1.5.0} + environment: + - "TEMPORAL_GRPC_ENDPOINT=temporal:7233" + - "TEMPORAL_PERMIT_WRITE_API=true" + ports: + - "8088:8088" + depends_on: + - temporal
\ No newline at end of file diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index e47dbd44..fc672e36 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -156,7 +156,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { }, nil, p) assert.NoError(t, err) - hs := &http.Server{Addr: ":8088", Handler: h} + hs := &http.Server{Addr: ":19658", Handler: h} defer func() { err := hs.Shutdown(context.Background()) if err != nil { @@ -172,7 +172,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) + req, err := http.NewRequest("GET", "http://localhost:19658?hello=world", nil) assert.NoError(t, err) req.Header.Add("user-agent", "") @@ -216,7 +216,7 @@ func TestHandler_User_Agent(t *testing.T) { }, nil, p) assert.NoError(t, err) - hs := &http.Server{Addr: ":8088", Handler: h} + hs := &http.Server{Addr: ":25688", Handler: h} defer func() { err := hs.Shutdown(context.Background()) if err != nil { @@ -232,7 +232,7 @@ func TestHandler_User_Agent(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) + req, err := http.NewRequest("GET", "http://localhost:25688?hello=world", nil) assert.NoError(t, err) req.Header.Add("User-Agent", "go-agent") diff --git a/tests/plugins/temporal/.rr.yaml b/tests/plugins/temporal/.rr.yaml new file mode 100644 index 00000000..04d0730d --- /dev/null +++ b/tests/plugins/temporal/.rr.yaml @@ -0,0 +1,22 @@ +# Application configuration +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php worker.php" + +# Workflow and activity mesh service +temporal: + address: "localhost:7233" + activities: + num_workers: 4 + codec: json + debug_level: 2 + +logs: + mode: none + channels: + activities: + mode: none + #workflows: + #mode: none
\ No newline at end of file diff --git a/tests/plugins/temporal/cancel_test.go b/tests/plugins/temporal/cancel_test.go new file mode 100644 index 00000000..0fd3c126 --- /dev/null +++ b/tests/plugins/temporal/cancel_test.go @@ -0,0 +1,291 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func Test_SimpleWorkflowCancel(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflow") + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 500) + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.Error(t, w.Get(context.Background(), &result)) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + assert.Equal(t, "Canceled", we.WorkflowExecutionInfo.Status.String()) +} + +func Test_CancellableWorkflowScope(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledScopeWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "yes", result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_TIMER_CANCELED + }) + + s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED + }) +} + +func Test_CancelledWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "CANCELLED", result) +} + +func Test_CancelledWithCompensationWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledWithCompensationWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "yield", + "rollback", + "captured retry", + "captured promise on cancelled", + "START rollback", + "WAIT ROLLBACK", + "RESULT (ROLLBACK)", "DONE rollback", + "COMPLETE rollback", + "result: OK", + }, + trace, + ) +} + +func Test_CancelledNestedWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledNestedWorkflow", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "CANCELLED", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "begin", + "first scope", + "second scope", + "close second scope", + "close first scope", + "second scope cancelled", + "first scope cancelled", + "close process", + }, + trace, + ) +} + +func Test_CancelledNSingleScopeWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledSingleScopeWorkflow", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "start", + "in scope", + "on cancel", + "captured in scope", + "captured in process", + }, + trace, + ) +} + +func Test_CancelledMidflightWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledMidflightWorkflow", + ) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "start", + "in scope", + "on cancel", + "done cancel", + }, + trace, + ) + + s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED + }) +} + +func Test_CancelSignalledChildWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelSignalledChildWorkflow", + ) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "cancelled ok", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "start", + "child started", + "child signalled", + "scope cancelled", + "process done", + }, + trace, + ) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED + }) +} diff --git a/tests/plugins/temporal/child_test.go b/tests/plugins/temporal/child_test.go new file mode 100644 index 00000000..49521791 --- /dev/null +++ b/tests/plugins/temporal/child_test.go @@ -0,0 +1,84 @@ +package tests + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/client" +) + +func Test_ExecuteChildWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WithChildWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "Child: CHILD HELLO WORLD", result) +} + +func Test_ExecuteChildStubWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WithChildStubWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "Child: CHILD HELLO WORLD", result) +} + +func Test_ExecuteChildStubWorkflow_02(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ChildStubWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result []string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, []string{"HELLO WORLD", "UNTYPED"}, result) +} + +func Test_SignalChildViaStubWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SignalChildViaStubWorkflow", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 8, result) +} diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go new file mode 100644 index 00000000..9ca4d018 --- /dev/null +++ b/tests/plugins/temporal/disaster_test.go @@ -0,0 +1,114 @@ +package tests + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/client" +) + +func Test_WorkerError_DisasterRecovery(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) + assert.NoError(t, err) + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 750) + + // must fully recover with new worker + assert.NoError(t, p.Kill()) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) +} + +func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + defer func() { + // always restore script + _ = os.Rename("worker.bak", "worker.php") + }() + + // Makes worker pool unable to recover for some time + _ = os.Rename("worker.php", "worker.bak") + + p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) + assert.NoError(t, err) + + // must fully recover with new worker + assert.NoError(t, p.Kill()) + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 750) + + // restore the script and recover activity pool + _ = os.Rename("worker.bak", "worker.php") + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) +} + +func Test_ActivityError_DisasterRecovery(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + defer func() { + // always restore script + _ = os.Rename("worker.bak", "worker.php") + }() + + // Makes worker pool unable to recover for some time + _ = os.Rename("worker.php", "worker.bak") + + // destroys all workers in activities + for _, wrk := range s.activities.Workers() { + assert.NoError(t, wrk.Kill()) + } + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + // activity can't complete at this moment + time.Sleep(time.Millisecond * 750) + + // restore the script and recover activity pool + _ = os.Rename("worker.bak", "worker.php") + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD", result) +} diff --git a/tests/plugins/temporal/hp_test.go b/tests/plugins/temporal/hp_test.go new file mode 100644 index 00000000..bceac025 --- /dev/null +++ b/tests/plugins/temporal/hp_test.go @@ -0,0 +1,408 @@ +package tests + +import ( + "context" + "crypto/rand" + "crypto/sha512" + "fmt" + "testing" + "time" + + "go.temporal.io/api/common/v1" + + "github.com/fatih/color" + "github.com/stretchr/testify/assert" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func init() { + color.NoColor = false +} + +func Test_VerifyRegistration(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + assert.Contains(t, s.workflows.WorkflowNames(), "SimpleWorkflow") + + assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.echo") + assert.Contains(t, s.activities.ActivityNames(), "HeartBeatActivity.doSomething") + + assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.lower") +} + +func Test_ExecuteSimpleWorkflow_1(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD", result) +} + +type User struct { + Name string + Email string +} + +func Test_ExecuteSimpleDTOWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleDTOWorkflow", + User{ + Name: "Antony", + Email: "[email protected]", + }, + ) + assert.NoError(t, err) + + var result struct{ Message string } + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "Hello Antony <[email protected]>", result.Message) +} + +func Test_ExecuteSimpleWorkflowWithSequenceInBatch(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WorkflowWithSequence", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) +} + +func Test_MultipleWorkflowsInSingleWorker(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + w2, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD", result) + + assert.NoError(t, w2.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) +} + +func Test_ExecuteWorkflowWithParallelScopes(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ParallelScopesWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD|Hello World|hello world", result) +} + +func Test_Timer(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + start := time.Now() + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) + assert.True(t, time.Since(start).Seconds() > 1) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_TIMER_STARTED + }) +} + +func Test_SideEffect(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SideEffectWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Contains(t, result, "hello world-") + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_MARKER_RECORDED + }) +} + +func Test_EmptyWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "EmptyWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 42, result) +} + +func Test_PromiseChaining(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ChainedWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "result:hello world", result) +} + +func Test_ActivityHeartbeat(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleHeartbeatWorkflow", + 2, + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + assert.Len(t, we.PendingActivities, 1) + + act := we.PendingActivities[0] + + assert.Equal(t, `{"value":2}`, string(act.HeartbeatDetails.Payloads[0].Data)) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) +} + +func Test_FailedActivityHeartbeat(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "FailedHeartbeatWorkflow", + 8, + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + assert.Len(t, we.PendingActivities, 1) + + act := we.PendingActivities[0] + + assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data)) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK!", result) +} + +func Test_BinaryPayload(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + rnd := make([]byte, 2500) + + _, err := rand.Read(rnd) + assert.NoError(t, err) + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "BinaryWorkflow", + rnd, + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + + assert.Equal(t, fmt.Sprintf("%x", sha512.Sum512(rnd)), result) +} + +func Test_ContinueAsNew(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ContinuableWorkflow", + 1, + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + assert.Equal(t, "ContinuedAsNew", we.WorkflowExecutionInfo.Status.String()) + + time.Sleep(time.Second) + + // the result of the final workflow + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK6", result) +} + +func Test_ActivityStubWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ActivityStubWorkflow", + "hello world", + ) + assert.NoError(t, err) + + // the result of the final workflow + var result []string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, []string{ + "HELLO WORLD", + "invalid method call", + "UNTYPED", + }, result) +} + +func Test_ExecuteProtoWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ProtoPayloadWorkflow", + ) + assert.NoError(t, err) + + var result common.WorkflowExecution + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "updated", result.RunId) + assert.Equal(t, "workflow id", result.WorkflowId) +} + +func Test_SagaWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SagaWorkflow", + ) + assert.NoError(t, err) + + var result string + assert.Error(t, w.Get(context.Background(), &result)) +} diff --git a/tests/plugins/temporal/query_test.go b/tests/plugins/temporal/query_test.go new file mode 100644 index 00000000..8b0caeee --- /dev/null +++ b/tests/plugins/temporal/query_test.go @@ -0,0 +1,66 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/client" +) + +func Test_ListQueries(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "QueryWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 500) + + v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "error", -1) + assert.Nil(t, v) + assert.Error(t, err) + + assert.Contains(t, err.Error(), "KnownQueryTypes=[get]") + + var r int + assert.NoError(t, w.Get(context.Background(), &r)) + assert.Equal(t, 0, r) +} + +func Test_GetQuery(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "QueryWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", 88) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 500) + + v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "get", nil) + assert.NoError(t, err) + + var r int + assert.NoError(t, v.Get(&r)) + assert.Equal(t, 88, r) + + assert.NoError(t, w.Get(context.Background(), &r)) + assert.Equal(t, 88, r) +} diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go new file mode 100644 index 00000000..c8d815c3 --- /dev/null +++ b/tests/plugins/temporal/server_test.go @@ -0,0 +1,198 @@ +package tests + +import ( + "context" + "testing" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/activity" + rrClient "github.com/spiral/roadrunner/v2/plugins/temporal/client" + "github.com/spiral/roadrunner/v2/plugins/temporal/workflow" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + temporalClient "go.temporal.io/sdk/client" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type TestServer struct { + container endure.Container + temporal rrClient.Temporal + activities *activity.Plugin + workflows *workflow.Plugin +} + +type ConfigOption struct { + Name string + Value interface{} +} + +func NewOption(name string, value interface{}) ConfigOption { + return ConfigOption{Name: name, Value: value} +} + +func NewTestServer(opt ...ConfigOption) *TestServer { + e, err := endure.NewContainer(initLogger(), endure.RetryOnFail(false)) + if err != nil { + panic(err) + } + + t := &rrClient.Plugin{} + a := &activity.Plugin{} + w := &workflow.Plugin{} + + if err := e.Register(initConfig(opt...)); err != nil { + panic(err) + } + + if err := e.Register(&logger.ZapLogger{}); err != nil { + panic(err) + } + if err := e.Register(&resetter.Plugin{}); err != nil { + panic(err) + } + if err := e.Register(&informer.Plugin{}); err != nil { + panic(err) + } + if err := e.Register(&server.Plugin{}); err != nil { + panic(err) + } + if err := e.Register(&rpc.Plugin{}); err != nil { + panic(err) + } + + if err := e.Register(t); err != nil { + panic(err) + } + if err := e.Register(a); err != nil { + panic(err) + } + if err := e.Register(w); err != nil { + panic(err) + } + + if err := e.Init(); err != nil { + panic(err) + } + + errCh, err := e.Serve() + if err != nil { + panic(err) + } + + go func() { + err := <-errCh + er := e.Stop() + if er != nil { + panic(err) + } + }() + + return &TestServer{container: e, temporal: t, activities: a, workflows: w} +} + +func (s *TestServer) Client() temporalClient.Client { + return s.temporal.GetClient() +} + +func (s *TestServer) MustClose() { + err := s.container.Stop() + if err != nil { + panic(err) + } +} + +func initConfig(opt ...ConfigOption) config.Configurer { + cfg := &config.Viper{} + cfg.Path = ".rr.yaml" + cfg.Prefix = "rr" + + return cfg +} + +func initLogger() *zap.Logger { + cfg := zap.Config{ + Level: zap.NewAtomicLevelAt(zap.ErrorLevel), + Encoding: "console", + EncoderConfig: zapcore.EncoderConfig{ + MessageKey: "message", + LevelKey: "level", + TimeKey: "time", + CallerKey: "caller", + NameKey: "name", + StacktraceKey: "stack", + EncodeLevel: zapcore.CapitalLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + + l, err := cfg.Build(zap.AddCaller()) + if err != nil { + panic(err) + } + + return l +} + +func (s *TestServer) AssertContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { + i := s.Client().GetWorkflowHistory( + context.Background(), + w.GetID(), + w.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + + for { + if !i.HasNext() { + t.Error("no more events and no match found") + break + } + + e, err := i.Next() + if err != nil { + t.Error("unable to read history event") + break + } + + if assert(e) { + break + } + } +} + +func (s *TestServer) AssertNotContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { + i := s.Client().GetWorkflowHistory( + context.Background(), + w.GetID(), + w.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + + for { + if !i.HasNext() { + break + } + + e, err := i.Next() + if err != nil { + t.Error("unable to read history event") + break + } + + if assert(e) { + t.Error("found unexpected event") + break + } + } +} diff --git a/tests/plugins/temporal/signal_test.go b/tests/plugins/temporal/signal_test.go new file mode 100644 index 00000000..51826287 --- /dev/null +++ b/tests/plugins/temporal/signal_test.go @@ -0,0 +1,170 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/assert" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func Test_SignalsWithoutSignals(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 0, result) +} + +func Test_SendSignalDuringTimer(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().SignalWithStartWorkflow( + context.Background(), + "signalled-"+uuid.New(), + "add", + 10, + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflow", + ) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 9, result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) + return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" + } + + return false + }) +} + +func Test_SendSignalBeforeCompletingWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflowWithSleep", + ) + assert.NoError(t, err) + + // should be around sleep(1) call + time.Sleep(time.Second + time.Millisecond*200) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, -1, result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) + return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" + } + + return false + }) +} + +func Test_RuntimeSignal(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().SignalWithStartWorkflow( + context.Background(), + "signalled-"+uuid.New(), + "add", + -1, + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "RuntimeSignalWorkflow", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, -1, result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) + return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" + } + + return false + }) +} + +func Test_SignalSteps(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WorkflowWithSignalledSteps", + ) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "begin", true) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next1", true) + assert.NoError(t, err) + + v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) + assert.NoError(t, err) + + var r int + assert.NoError(t, v.Get(&r)) + assert.Equal(t, 2, r) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next2", true) + assert.NoError(t, err) + + v, err = s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) + assert.NoError(t, err) + + assert.NoError(t, v.Get(&r)) + assert.Equal(t, 3, r) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + + // 3 ticks + assert.Equal(t, 3, result) +} diff --git a/tests/plugins/temporal/worker.php b/tests/plugins/temporal/worker.php new file mode 100644 index 00000000..0d0263e7 --- /dev/null +++ b/tests/plugins/temporal/worker.php @@ -0,0 +1,33 @@ +<?php + +declare(strict_types=1); + +require __DIR__ . '/../../vendor/autoload.php'; + +/** + * @param string $dir + * @return array<string> + */ +$getClasses = static function (string $dir): iterable { + $files = glob($dir . '/*.php'); + + foreach ($files as $file) { + yield substr(basename($file), 0, -4); + } +}; + +$factory = \Temporal\WorkerFactory::create(); + +$worker = $factory->newWorker('default'); + +// register all workflows +foreach ($getClasses(__DIR__ . '/../../temporal/Workflow') as $name) { + $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name); +} + +// register all activity +foreach ($getClasses(__DIR__ . '/../../temporal/Activity') as $name) { + $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name); +} + +$factory->run(); diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php index d382098a..ef741a61 100644 --- a/tests/psr-worker-bench.php +++ b/tests/psr-worker-bench.php @@ -1,24 +1,58 @@ <?php +declare(strict_types=1); + use Spiral\RoadRunner; use Nyholm\Psr7\Factory; ini_set('display_errors', 'stderr'); include "vendor/autoload.php"; -$worker = new RoadRunner\Http\PSR7Worker( - RoadRunner\Worker::create(), - new Factory\Psr17Factory(), - new Factory\Psr17Factory(), - new Factory\Psr17Factory() -); - -while ($req = $worker->waitRequest()) { - try { - $rsp = new \Nyholm\Psr7\Response(); - $rsp->getBody()->write("hello world"); - $worker->respond($rsp); - } catch (\Throwable $e) { - $worker->getWorker()->error((string)$e); +$env = \Spiral\RoadRunner\Environment::fromGlobals(); + +if ($env->getMode() === 'http') { + $worker = new RoadRunner\Http\PSR7Worker( + RoadRunner\Worker::create(), + new Factory\Psr17Factory(), + new Factory\Psr17Factory(), + new Factory\Psr17Factory() + ); + + while ($req = $worker->waitRequest()) { + try { + $rsp = new \Nyholm\Psr7\Response(); + $rsp->getBody()->write("hello world"); + $worker->respond($rsp); + } catch (\Throwable $e) { + $worker->getWorker()->error((string)$e); + } } +} else { + /** + * @param string $dir + * @return array<string> + */ + $getClasses = static function (string $dir): iterable { + $files = glob($dir . '/*.php'); + + foreach ($files as $file) { + yield substr(basename($file), 0, -4); + } + }; + + $factory = \Temporal\WorkerFactory::create(); + + $worker = $factory->newWorker('default'); + + // register all workflows + foreach ($getClasses(__DIR__ . '/../temporal/Workflow') as $name) { + $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name); + } + + // register all activity + foreach ($getClasses(__DIR__ . '/../temporal/Activity') as $name) { + $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name); + } + + $factory->run(); }
\ No newline at end of file diff --git a/tests/temporal/Activity/HeartBeatActivity.php b/tests/temporal/Activity/HeartBeatActivity.php new file mode 100644 index 00000000..acf4a451 --- /dev/null +++ b/tests/temporal/Activity/HeartBeatActivity.php @@ -0,0 +1,58 @@ +<?php + +namespace Temporal\Tests\Activity; + +use Temporal\Activity; +use Temporal\Activity\ActivityInterface; +use Temporal\Activity\ActivityMethod; +use Temporal\Roadrunner\Internal\Error; + +#[ActivityInterface(prefix: "HeartBeatActivity.")] +class HeartBeatActivity +{ + #[ActivityMethod] + public function doSomething( + int $value + ): string { + Activity::heartbeat(['value' => $value]); + sleep($value); + return 'OK'; + } + + #[ActivityMethod] + public function slow( + string $value + ): string { + for ($i = 0; $i < 5; $i++) { + Activity::heartbeat(['value' => $i]); + sleep(1); + } + + return 'OK'; + } + + #[ActivityMethod] + public function something( + string $value + ): string { + Activity::heartbeat(['value' => $value]); + sleep($value); + return 'OK'; + } + + #[ActivityMethod] + public function failedActivity( + int $value + ): string { + Activity::heartbeat(['value' => $value]); + if (Activity::getInfo()->attempt === 1) { + throw new \Error("failed"); + } + + if (!is_array(Activity::getHeartbeatDetails())) { + throw new \Error("no heartbeat details"); + } + + return 'OK!'; + } +}
\ No newline at end of file diff --git a/tests/temporal/Activity/SimpleActivity.php b/tests/temporal/Activity/SimpleActivity.php new file mode 100644 index 00000000..576b126e --- /dev/null +++ b/tests/temporal/Activity/SimpleActivity.php @@ -0,0 +1,63 @@ +<?php + +namespace Temporal\Tests\Activity; + +use Temporal\Activity\ActivityInterface; +use Temporal\Activity\ActivityMethod; +use Temporal\Api\Common\V1\WorkflowExecution; +use Temporal\DataConverter\Bytes; +use Temporal\Tests\DTO\Message; +use Temporal\Tests\DTO\User; + +#[ActivityInterface(prefix: "SimpleActivity.")] +class SimpleActivity +{ + #[ActivityMethod] + public function echo( + string $input + ): string { + return strtoupper($input); + } + + #[ActivityMethod] + public function lower( + string $input + ): string { + return strtolower($input); + } + + #[ActivityMethod] + public function greet( + User $user + ): Message { + return new Message(sprintf("Hello %s <%s>", $user->name, $user->email)); + } + + #[ActivityMethod] + public function slow( + string $input + ): string { + sleep(2); + + return strtolower($input); + } + + #[ActivityMethod] + public function sha512( + Bytes $input + ): string { + return hash("sha512", ($input->getData())); + } + + public function updateRunID(WorkflowExecution $e): WorkflowExecution + { + $e->setRunId('updated'); + return $e; + } + + #[ActivityMethod] + public function fail() + { + throw new \Error("failed activity"); + } +}
\ No newline at end of file diff --git a/tests/temporal/Client/StartNewWorkflow.php b/tests/temporal/Client/StartNewWorkflow.php new file mode 100644 index 00000000..67bc1d01 --- /dev/null +++ b/tests/temporal/Client/StartNewWorkflow.php @@ -0,0 +1,23 @@ +<?php + + +namespace Temporal\Tests\Client; + +use Temporal\Client; +use Temporal\Tests\Workflow\SimpleDTOWorkflow; + +use function Symfony\Component\String\s; + +class StartNewWorkflow +{ + private $stub; + + public function __construct(Client\ClientInterface $client) + { + $this->stub = $client->newWorkflowStub(SimpleDTOWorkflow::class); + } + + public function __invoke() + { + } +} diff --git a/tests/temporal/DTO/Message.php b/tests/temporal/DTO/Message.php new file mode 100644 index 00000000..61703fe8 --- /dev/null +++ b/tests/temporal/DTO/Message.php @@ -0,0 +1,14 @@ +<?php + + +namespace Temporal\Tests\DTO; + +class Message +{ + public string $message; + + public function __construct(string $message) + { + $this->message = $message; + } +}
\ No newline at end of file diff --git a/tests/temporal/DTO/User.php b/tests/temporal/DTO/User.php new file mode 100644 index 00000000..cefea137 --- /dev/null +++ b/tests/temporal/DTO/User.php @@ -0,0 +1,15 @@ +<?php + + +namespace Temporal\Tests\DTO; + +use Temporal\Internal\Marshaller\Meta\Marshal; + +class User +{ + #[Marshal(name: "Name")] + public string $name; + + #[Marshal(name: "Email")] + public string $email; +}
\ No newline at end of file diff --git a/tests/temporal/Workflow/ActivityStubWorkflow.php b/tests/temporal/Workflow/ActivityStubWorkflow.php new file mode 100644 index 00000000..58dcdafb --- /dev/null +++ b/tests/temporal/Workflow/ActivityStubWorkflow.php @@ -0,0 +1,39 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ActivityStubWorkflow +{ + #[WorkflowMethod(name: 'ActivityStubWorkflow')] + public function handler( + string $input + ) { + // typed stub + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $result = []; + $result[] = yield $simple->echo($input); + + try { + $simple->undefined($input); + } catch (\BadMethodCallException $e) { + $result[] = 'invalid method call'; + } + + // untyped stub + $untyped = Workflow::newUntypedActivityStub(ActivityOptions::new()->withStartToCloseTimeout(1)); + + $result[] = yield $untyped->execute('SimpleActivity.echo', ['untyped']); + + return $result; + } +} diff --git a/tests/temporal/Workflow/AggregatedWorkflow.php b/tests/temporal/Workflow/AggregatedWorkflow.php new file mode 100644 index 00000000..3299179e --- /dev/null +++ b/tests/temporal/Workflow/AggregatedWorkflow.php @@ -0,0 +1,30 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\SignalMethod; +use Temporal\Workflow\WorkflowInterface; +use Temporal\Workflow\WorkflowMethod; + +#[WorkflowInterface] +class AggregatedWorkflow +{ + private array $values = []; + + #[SignalMethod] + public function addValue( + string $value + ) { + $this->values[] = $value; + } + + #[WorkflowMethod(name: 'AggregatedWorkflow')] + public function run( + int $count + ) { + yield Workflow::await(fn() => count($this->values) === $count); + + return $this->values; + } +} diff --git a/tests/temporal/Workflow/AsyncActivityWorkflow.php b/tests/temporal/Workflow/AsyncActivityWorkflow.php new file mode 100644 index 00000000..79e45dfb --- /dev/null +++ b/tests/temporal/Workflow/AsyncActivityWorkflow.php @@ -0,0 +1,28 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityCancellationType; +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class AsyncActivityWorkflow +{ + #[WorkflowMethod(name: 'AsyncActivityWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(20) + ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED) + ); + + return yield $simple->external(); + } +} diff --git a/tests/temporal/Workflow/BinaryWorkflow.php b/tests/temporal/Workflow/BinaryWorkflow.php new file mode 100644 index 00000000..ed1952ad --- /dev/null +++ b/tests/temporal/Workflow/BinaryWorkflow.php @@ -0,0 +1,21 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\DataConverter\Bytes; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class BinaryWorkflow +{ + #[WorkflowMethod(name: 'BinaryWorkflow')] + public function handler( + Bytes $input + ): iterable { + $opts = ActivityOptions::new()->withStartToCloseTimeout(5); + + return yield Workflow::executeActivity('SimpleActivity.sha512', [$input], $opts); + } +} diff --git a/tests/temporal/Workflow/CancelSignalledChildWorkflow.php b/tests/temporal/Workflow/CancelSignalledChildWorkflow.php new file mode 100644 index 00000000..e2e43efa --- /dev/null +++ b/tests/temporal/Workflow/CancelSignalledChildWorkflow.php @@ -0,0 +1,57 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use React\Promise\Deferred; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelSignalledChildWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelSignalledChildWorkflow')] + public function handler() + { + // typed stub + $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class); + + $waitSignalled = new Deferred(); + + $this->status[] = 'start'; + + // start execution + $scope = Workflow::newCancellationScope( + function () use ($simple, $waitSignalled) { + $call = $simple->handler(); + $this->status[] = 'child started'; + + yield $simple->add(8); + $this->status[] = 'child signalled'; + $waitSignalled->resolve(); + + return yield $call; + } + ); + + // only cancel scope when signal dispatched + yield $waitSignalled; + $scope->cancel(); + $this->status[] = 'scope cancelled'; + + try { + return yield $scope; + } catch (\Throwable $e) { + $this->status[] = 'process done'; + + return 'cancelled ok'; + } + } +} diff --git a/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php b/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php new file mode 100644 index 00000000..6b463192 --- /dev/null +++ b/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php @@ -0,0 +1,29 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityCancellationType; +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\HeartBeatActivity; + +#[Workflow\WorkflowInterface] +class CanceledHeartbeatWorkflow +{ + #[WorkflowMethod(name: 'CanceledHeartbeatWorkflow')] + public function handler(): iterable + { + $act = Workflow::newActivityStub( + HeartBeatActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(50) + ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED) + ->withHeartbeatTimeout(1) + ); + + return yield $act->slow('test'); + } +} diff --git a/tests/temporal/Workflow/CancelledMidflightWorkflow.php b/tests/temporal/Workflow/CancelledMidflightWorkflow.php new file mode 100644 index 00000000..ea799ce1 --- /dev/null +++ b/tests/temporal/Workflow/CancelledMidflightWorkflow.php @@ -0,0 +1,47 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelledMidflightWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelledMidflightWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $this->status[] = 'start'; + + $scope = Workflow::newCancellationScope( + function () use ($simple) { + $this->status[] = 'in scope'; + $simple->slow('1'); + } + )->onCancel( + function () { + $this->status[] = 'on cancel'; + } + ); + + $scope->cancel(); + $this->status[] = 'done cancel'; + + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/CancelledNestedWorkflow.php b/tests/temporal/Workflow/CancelledNestedWorkflow.php new file mode 100644 index 00000000..0c82f761 --- /dev/null +++ b/tests/temporal/Workflow/CancelledNestedWorkflow.php @@ -0,0 +1,72 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Exception\Failure\CanceledFailure; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelledNestedWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelledNestedWorkflow')] + public function handler() + { + $this->status[] = 'begin'; + try { + yield Workflow::newCancellationScope( + function () { + $this->status[] = 'first scope'; + + $scope = Workflow::newCancellationScope( + function () { + $this->status[] = 'second scope'; + + try { + yield Workflow::timer(2); + } catch (CanceledFailure $e) { + $this->status[] = 'second scope cancelled'; + throw $e; + } + + $this->status[] = 'second scope done'; + } + )->onCancel( + function () { + $this->status[] = 'close second scope'; + } + ); + + try { + yield Workflow::timer(1); + } catch (CanceledFailure $e) { + $this->status[] = 'first scope cancelled'; + throw $e; + } + + $this->status[] = 'first scope done'; + + yield $scope; + } + )->onCancel( + function () { + $this->status[] = 'close first scope'; + } + ); + } catch (CanceledFailure $e) { + $this->status[] = 'close process'; + + return 'CANCELLED'; + } + + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/CancelledScopeWorkflow.php b/tests/temporal/Workflow/CancelledScopeWorkflow.php new file mode 100644 index 00000000..50e0992f --- /dev/null +++ b/tests/temporal/Workflow/CancelledScopeWorkflow.php @@ -0,0 +1,39 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class CancelledScopeWorkflow +{ + #[WorkflowMethod(name: 'CancelledScopeWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $cancelled = 'not'; + + $scope = Workflow::newCancellationScope( + function () use ($simple) { + yield Workflow::timer(2); + yield $simple->slow('hello'); + } + )->onCancel( + function () use (&$cancelled) { + $cancelled = 'yes'; + } + ); + + yield Workflow::timer(1); + $scope->cancel(); + + return $cancelled; + } +} diff --git a/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php b/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php new file mode 100644 index 00000000..5fe8d3d8 --- /dev/null +++ b/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php @@ -0,0 +1,55 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Exception\Failure\CanceledFailure; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class CancelledSingleScopeWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelledSingleScopeWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(5) + ); + + $this->status[] = 'start'; + try { + yield Workflow::newCancellationScope( + function () use ($simple) { + try { + $this->status[] = 'in scope'; + yield $simple->slow('1'); + } catch (CanceledFailure $e) { + // after process is complete, do not use for business logic + $this->status[] = 'captured in scope'; + throw $e; + } + } + )->onCancel( + function () { + $this->status[] = 'on cancel'; + } + ); + } catch (CanceledFailure $e) { + $this->status[] = 'captured in process'; + } + + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php b/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php new file mode 100644 index 00000000..2074aac1 --- /dev/null +++ b/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php @@ -0,0 +1,79 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Exception\Failure\CanceledFailure; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelledWithCompensationWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelledWithCompensationWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + // waits for 2 seconds + $slow = $simple->slow('DOING SLOW ACTIVITY'); + + try { + $this->status[] = 'yield'; + $result = yield $slow; + } catch (CanceledFailure $e) { + $this->status[] = 'rollback'; + + try { + // must fail again + $result = yield $slow; + } catch (CanceledFailure $e) { + $this->status[] = 'captured retry'; + } + + try { + // fail since on cancelled context + $result = yield $simple->echo('echo must fail'); + } catch (CanceledFailure $e) { + $this->status[] = 'captured promise on cancelled'; + } + + $scope = Workflow::newDetachedCancellationScope( + function () use ($simple) { + $this->status[] = 'START rollback'; + + $second = yield $simple->echo('rollback'); + + $this->status[] = sprintf("RESULT (%s)", $second); + + if ($second !== 'ROLLBACK') { + $this->status[] = 'FAIL rollback'; + return 'failed to compensate ' . $second; + } + $this->status[] = 'DONE rollback'; + + return 'OK'; + } + ); + + $this->status[] = 'WAIT ROLLBACK'; + $result = yield $scope; + $this->status[] = 'COMPLETE rollback'; + } + + $this->status[] = 'result: ' . $result; + return $result; + } +} diff --git a/tests/temporal/Workflow/CancelledWorkflow.php b/tests/temporal/Workflow/CancelledWorkflow.php new file mode 100644 index 00000000..be9f7542 --- /dev/null +++ b/tests/temporal/Workflow/CancelledWorkflow.php @@ -0,0 +1,31 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Exception\Failure\CanceledFailure; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelledWorkflow +{ + #[WorkflowMethod(name: 'CancelledWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + // waits for 2 seconds + $slow = $simple->slow('DOING SLOW ACTIVITY'); + + try { + return yield $slow; + } catch (CanceledFailure $e) { + return "CANCELLED"; + } + } +} diff --git a/tests/temporal/Workflow/ChainedWorkflow.php b/tests/temporal/Workflow/ChainedWorkflow.php new file mode 100644 index 00000000..ba9c8f96 --- /dev/null +++ b/tests/temporal/Workflow/ChainedWorkflow.php @@ -0,0 +1,31 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ChainedWorkflow +{ + #[WorkflowMethod(name: 'ChainedWorkflow')] + public function handler(string $input): iterable + { + $opts = ActivityOptions::new()->withStartToCloseTimeout(5); + + return yield Workflow::executeActivity( + 'SimpleActivity.echo', + [$input], + $opts + )->then(function ($result) use ($opts) { + return Workflow::executeActivity( + 'SimpleActivity.lower', + ['Result:' . $result], + $opts + ); + }); + } +} diff --git a/tests/temporal/Workflow/ChildStubWorkflow.php b/tests/temporal/Workflow/ChildStubWorkflow.php new file mode 100644 index 00000000..608962c2 --- /dev/null +++ b/tests/temporal/Workflow/ChildStubWorkflow.php @@ -0,0 +1,30 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ChildStubWorkflow +{ + #[WorkflowMethod(name: 'ChildStubWorkflow')] + public function handler( + string $input + ) { + // typed stub + $simple = Workflow::newChildWorkflowStub(SimpleWorkflow::class); + + $result = []; + $result[] = yield $simple->handler($input); + + // untyped + $untyped = Workflow::newUntypedChildWorkflowStub('SimpleWorkflow'); + $result[] = yield $untyped->execute(['untyped']); + + $execution = yield $untyped->getExecution(); + assert($execution instanceof Workflow\WorkflowExecution); + + return $result; + } +} diff --git a/tests/temporal/Workflow/ComplexExceptionalWorkflow.php b/tests/temporal/Workflow/ComplexExceptionalWorkflow.php new file mode 100644 index 00000000..bf65ccb2 --- /dev/null +++ b/tests/temporal/Workflow/ComplexExceptionalWorkflow.php @@ -0,0 +1,26 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ComplexExceptionalWorkflow +{ + #[WorkflowMethod(name: 'ComplexExceptionalWorkflow')] + public function handler() + { + $child = Workflow::newChildWorkflowStub( + ExceptionalActivityWorkflow::class, + Workflow\ChildWorkflowOptions::new()->withRetryOptions( + (new RetryOptions())->withMaximumAttempts(1) + ) + ); + + return yield $child->handler(); + } +} diff --git a/tests/temporal/Workflow/ContinuableWorkflow.php b/tests/temporal/Workflow/ContinuableWorkflow.php new file mode 100644 index 00000000..78411414 --- /dev/null +++ b/tests/temporal/Workflow/ContinuableWorkflow.php @@ -0,0 +1,38 @@ +<?php + + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class ContinuableWorkflow +{ + #[WorkflowMethod(name: 'ContinuableWorkflow')] + public function handler( + int $generation + ) { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + if ($generation > 5) { + // complete + return "OK" . $generation; + } + + if ($generation !== 1) { + assert(!empty(Workflow::getInfo()->continuedExecutionRunId)); + } + + for ($i = 0; $i < $generation; $i++) { + yield $simple->echo((string)$generation); + } + + return Workflow::continueAsNew('ContinuableWorkflow', [++$generation]); + } +} diff --git a/tests/temporal/Workflow/EmptyWorkflow.php b/tests/temporal/Workflow/EmptyWorkflow.php new file mode 100644 index 00000000..57fb5e65 --- /dev/null +++ b/tests/temporal/Workflow/EmptyWorkflow.php @@ -0,0 +1,16 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow\WorkflowMethod; +use Temporal\Workflow; + +#[Workflow\WorkflowInterface] +class EmptyWorkflow +{ + #[WorkflowMethod] + public function handler() + { + return 42; + } +} diff --git a/tests/temporal/Workflow/ExceptionalActivityWorkflow.php b/tests/temporal/Workflow/ExceptionalActivityWorkflow.php new file mode 100644 index 00000000..e0ed0005 --- /dev/null +++ b/tests/temporal/Workflow/ExceptionalActivityWorkflow.php @@ -0,0 +1,25 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ExceptionalActivityWorkflow +{ + #[WorkflowMethod(name: 'ExceptionalActivityWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ->withRetryOptions((new RetryOptions())->withMaximumAttempts(1)) + ); + + return yield $simple->fail(); + } +} diff --git a/tests/temporal/Workflow/ExceptionalWorkflow.php b/tests/temporal/Workflow/ExceptionalWorkflow.php new file mode 100644 index 00000000..9a3e907f --- /dev/null +++ b/tests/temporal/Workflow/ExceptionalWorkflow.php @@ -0,0 +1,18 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ExceptionalWorkflow +{ + #[WorkflowMethod(name: 'ExceptionalWorkflow')] + public function handler() + { + throw new \RuntimeException("workflow error"); + } +} diff --git a/tests/temporal/Workflow/FailedHeartbeatWorkflow.php b/tests/temporal/Workflow/FailedHeartbeatWorkflow.php new file mode 100644 index 00000000..e857f100 --- /dev/null +++ b/tests/temporal/Workflow/FailedHeartbeatWorkflow.php @@ -0,0 +1,30 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\HeartBeatActivity; + +#[Workflow\WorkflowInterface] +class FailedHeartbeatWorkflow +{ + #[WorkflowMethod(name: 'FailedHeartbeatWorkflow')] + public function handler( + int $iterations + ): iterable { + $act = Workflow::newActivityStub( + HeartBeatActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(50) + // will fail on first attempt + ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(2)) + ); + + return yield $act->failedActivity($iterations); + } +} diff --git a/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php b/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php new file mode 100644 index 00000000..c389fd78 --- /dev/null +++ b/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php @@ -0,0 +1,55 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\SignalMethod; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class LoopWithSignalCoroutinesWorkflow +{ + private array $values = []; + private array $result = []; + private $simple; + + public function __construct() + { + $this->simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + } + + #[SignalMethod] + public function addValue( + string $value + ) { + $value = yield $this->simple->prefix('in signal ', $value); + $value = yield $this->simple->prefix('in signal 2 ', $value); + + $this->values[] = $value; + } + + #[WorkflowMethod(name: 'LoopWithSignalCoroutinesWorkflow')] + public function run( + int $count + ) { + while (true) { + yield Workflow::await(fn() => $this->values !== []); + $value = array_shift($this->values); + + // uppercases + $this->result[] = yield $this->simple->echo($value); + + if (count($this->result) === $count) { + break; + } + } + + asort($this->result); + return array_values($this->result); + } +} diff --git a/tests/temporal/Workflow/LoopWorkflow.php b/tests/temporal/Workflow/LoopWorkflow.php new file mode 100644 index 00000000..97d7a3aa --- /dev/null +++ b/tests/temporal/Workflow/LoopWorkflow.php @@ -0,0 +1,51 @@ +<?php + + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\SignalMethod; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class LoopWorkflow +{ + private array $values = []; + private array $result = []; + private $simple; + + public function __construct() + { + $this->simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + } + + #[SignalMethod] + public function addValue( + string $value + ) { + $this->values[] = $value; + } + + #[WorkflowMethod(name: 'LoopWorkflow')] + public function run( + int $count + ) { + while (true) { + yield Workflow::await(fn() => $this->values !== []); + $value = array_shift($this->values); + + $this->result[] = yield $this->simple->echo($value); + + if (count($this->result) === $count) { + break; + } + } + + return $this->result; + } +} diff --git a/tests/temporal/Workflow/ParallelScopesWorkflow.php b/tests/temporal/Workflow/ParallelScopesWorkflow.php new file mode 100644 index 00000000..8a2303f4 --- /dev/null +++ b/tests/temporal/Workflow/ParallelScopesWorkflow.php @@ -0,0 +1,36 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Promise; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class ParallelScopesWorkflow +{ + #[WorkflowMethod(name: 'ParallelScopesWorkflow')] + public function handler(string $input) + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $a = Workflow::newCancellationScope(function () use ($simple, $input) { + return yield $simple->echo($input); + }); + + $b = Workflow::newCancellationScope(function () use ($simple, $input) { + return yield $simple->lower($input); + }); + + [$ra, $rb] = yield Promise::all([$a, $b]); + + return sprintf('%s|%s|%s', $ra, $input, $rb); + } +} diff --git a/tests/temporal/Workflow/PeriodicWorkflow.php b/tests/temporal/Workflow/PeriodicWorkflow.php new file mode 100644 index 00000000..08f5f2fa --- /dev/null +++ b/tests/temporal/Workflow/PeriodicWorkflow.php @@ -0,0 +1,19 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class PeriodicWorkflow +{ + #[WorkflowMethod(name: 'PeriodicWorkflow')] + public function handler() + { + error_log("GOT SOMETHING" . print_r(Workflow::getLastCompletionResult(), true)); + + // todo: get last completion result + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/ProtoPayloadWorkflow.php b/tests/temporal/Workflow/ProtoPayloadWorkflow.php new file mode 100644 index 00000000..7adbed1e --- /dev/null +++ b/tests/temporal/Workflow/ProtoPayloadWorkflow.php @@ -0,0 +1,33 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Api\Common\V1\WorkflowExecution; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ProtoPayloadWorkflow +{ + #[WorkflowMethod(name: 'ProtoPayloadWorkflow')] + public function handler(): iterable + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $e = new WorkflowExecution(); + $e->setWorkflowId('workflow id'); + $e->setRunId('run id'); + + /** @var WorkflowExecution $e2 */ + $e2 = yield $simple->updateRunID($e); + assert($e2->getWorkflowId() === $e->getWorkflowId()); + assert($e2->getRunId() === 'updated'); + + return $e2; + } +} diff --git a/tests/temporal/Workflow/QueryWorkflow.php b/tests/temporal/Workflow/QueryWorkflow.php new file mode 100644 index 00000000..96e41582 --- /dev/null +++ b/tests/temporal/Workflow/QueryWorkflow.php @@ -0,0 +1,41 @@ +<?php + +/** + * This file is part of Temporal package. + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class QueryWorkflow +{ + private int $counter = 0; + + #[Workflow\SignalMethod(name: "add")] + public function add( + int $value + ) { + $this->counter += $value; + } + + #[Workflow\QueryMethod(name: "get")] + public function get(): int + { + return $this->counter; + } + + #[WorkflowMethod] + public function handler() + { + // collect signals during one second + yield Workflow::timer(1); + + return $this->counter; + } +} diff --git a/tests/temporal/Workflow/RuntimeSignalWorkflow.php b/tests/temporal/Workflow/RuntimeSignalWorkflow.php new file mode 100644 index 00000000..f700af72 --- /dev/null +++ b/tests/temporal/Workflow/RuntimeSignalWorkflow.php @@ -0,0 +1,31 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use React\Promise\Deferred; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class RuntimeSignalWorkflow +{ + #[WorkflowMethod] + public function handler() + { + $wait1 = new Deferred(); + $wait2 = new Deferred(); + + $counter = 0; + + Workflow::registerSignal('add', function ($value) use (&$counter, $wait1, $wait2) { + $counter += $value; + $wait1->resolve($value); + $wait2->resolve($value); + }); + + yield $wait1; + yield $wait2; + + return $counter; + } +} diff --git a/tests/temporal/Workflow/SagaWorkflow.php b/tests/temporal/Workflow/SagaWorkflow.php new file mode 100644 index 00000000..e47c0203 --- /dev/null +++ b/tests/temporal/Workflow/SagaWorkflow.php @@ -0,0 +1,54 @@ +<?php + +/** + * This file is part of Temporal package. + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; + +#[Workflow\WorkflowInterface] +class SagaWorkflow +{ + #[Workflow\WorkflowMethod(name: 'SagaWorkflow')] + public function run() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(60) + ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(1)) + ); + + $saga = new Workflow\Saga(); + $saga->setParallelCompensation(true); + + try { + yield $simple->echo('test'); + $saga->addCompensation( + function () use ($simple) { + yield $simple->echo('compensate echo'); + } + ); + + yield $simple->lower('TEST'); + $saga->addCompensation( + function () use ($simple) { + yield $simple->lower('COMPENSATE LOWER'); + } + ); + + yield $simple->fail(); + } catch (\Throwable $e) { + yield $saga->compensate(); + throw $e; + } + } +} diff --git a/tests/temporal/Workflow/SideEffectWorkflow.php b/tests/temporal/Workflow/SideEffectWorkflow.php new file mode 100644 index 00000000..95d396e4 --- /dev/null +++ b/tests/temporal/Workflow/SideEffectWorkflow.php @@ -0,0 +1,30 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\Uuid; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class SideEffectWorkflow +{ + #[WorkflowMethod(name: 'SideEffectWorkflow')] + public function handler(string $input): iterable + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $result = yield Workflow::sideEffect( + function () use ($input) { + return $input . '-42'; + } + ); + + return yield $simple->lower($result); + } +} diff --git a/tests/temporal/Workflow/SignalChildViaStubWorkflow.php b/tests/temporal/Workflow/SignalChildViaStubWorkflow.php new file mode 100644 index 00000000..828086fc --- /dev/null +++ b/tests/temporal/Workflow/SignalChildViaStubWorkflow.php @@ -0,0 +1,25 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class SignalChildViaStubWorkflow +{ + #[WorkflowMethod(name: 'SignalChildViaStubWorkflow')] + public function handler() + { + // typed stub + $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class); + + // start execution + $call = $simple->handler(); + + yield $simple->add(8); + + // expects 8 + return yield $call; + } +} diff --git a/tests/temporal/Workflow/SimpleDTOWorkflow.php b/tests/temporal/Workflow/SimpleDTOWorkflow.php new file mode 100644 index 00000000..bd39a0a0 --- /dev/null +++ b/tests/temporal/Workflow/SimpleDTOWorkflow.php @@ -0,0 +1,35 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Tests\DTO\Message; +use Temporal\Tests\DTO\User; + +#[Workflow\WorkflowInterface] +class SimpleDTOWorkflow +{ + #[WorkflowMethod(name: 'SimpleDTOWorkflow')]//, returnType: Message::class)] + public function handler( + User $user + ) { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(5) + ); + + $value = yield $simple->greet($user); + + if (!$value instanceof Message) { + return "FAIL"; + } + + return $value; + } +} diff --git a/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php b/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php new file mode 100644 index 00000000..c9999cd1 --- /dev/null +++ b/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php @@ -0,0 +1,25 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\HeartBeatActivity; + +#[Workflow\WorkflowInterface] +class SimpleHeartbeatWorkflow +{ + #[WorkflowMethod(name: 'SimpleHeartbeatWorkflow')] + public function handler(int $iterations): iterable + { + $act = Workflow::newActivityStub( + HeartBeatActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(50) + ); + + return yield $act->doSomething($iterations); + } +} diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflow.php b/tests/temporal/Workflow/SimpleSignalledWorkflow.php new file mode 100644 index 00000000..0df25a65 --- /dev/null +++ b/tests/temporal/Workflow/SimpleSignalledWorkflow.php @@ -0,0 +1,30 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class SimpleSignalledWorkflow +{ + private $counter = 0; + + #[Workflow\SignalMethod(name: "add")] + public function add( + int $value + ) { + $this->counter += $value; + } + + #[WorkflowMethod(name: 'SimpleSignalledWorkflow')] + public function handler(): iterable + { + // collect signals during one second + yield Workflow::timer(1); + + return $this->counter; + } +} diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php b/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php new file mode 100644 index 00000000..d10ba04a --- /dev/null +++ b/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php @@ -0,0 +1,34 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class SimpleSignalledWorkflowWithSleep +{ + private $counter = 0; + + #[Workflow\SignalMethod(name: "add")] + public function add( + int $value + ) { + $this->counter += $value; + } + + #[WorkflowMethod(name: 'SimpleSignalledWorkflowWithSleep')] + public function handler(): iterable + { + // collect signals during one second + yield Workflow::timer(1); + + if (!Workflow::isReplaying()) { + sleep(1); + } + + return $this->counter; + } +} diff --git a/tests/temporal/Workflow/SimpleWorkflow.php b/tests/temporal/Workflow/SimpleWorkflow.php new file mode 100644 index 00000000..36e12f69 --- /dev/null +++ b/tests/temporal/Workflow/SimpleWorkflow.php @@ -0,0 +1,31 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class SimpleWorkflow +{ + #[WorkflowMethod(name: 'SimpleWorkflow')] + public function handler( + string $input + ): iterable { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(5) + ->withRetryOptions( + RetryOptions::new()->withMaximumAttempts(2) + ) + ); + + return yield $simple->echo($input); + } +} diff --git a/tests/temporal/Workflow/TimerWorkflow.php b/tests/temporal/Workflow/TimerWorkflow.php new file mode 100644 index 00000000..ab60d6c9 --- /dev/null +++ b/tests/temporal/Workflow/TimerWorkflow.php @@ -0,0 +1,27 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class TimerWorkflow +{ + #[WorkflowMethod(name: 'TimerWorkflow')] + public function handler(string $input): iterable + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + yield Workflow::timer(1); + + return yield $simple->lower($input); + } +} diff --git a/tests/temporal/Workflow/WaitWorkflow.php b/tests/temporal/Workflow/WaitWorkflow.php new file mode 100644 index 00000000..826952c1 --- /dev/null +++ b/tests/temporal/Workflow/WaitWorkflow.php @@ -0,0 +1,33 @@ +<?php + + +namespace Temporal\Tests\Workflow; + + +use Temporal\Workflow; +use Temporal\Workflow\SignalMethod; +use Temporal\Workflow\WorkflowInterface; +use Temporal\Workflow\WorkflowMethod; + +#[WorkflowInterface] +class WaitWorkflow +{ + private bool $ready = false; + private string $value; + + #[SignalMethod] + public function unlock( + string $value + ) { + $this->ready = true; + $this->value = $value; + } + + #[WorkflowMethod(name: 'WaitWorkflow')] + public function run() + { + yield Workflow::await(fn() => $this->ready); + + return $this->value; + } +} diff --git a/tests/temporal/Workflow/WithChildStubWorkflow.php b/tests/temporal/Workflow/WithChildStubWorkflow.php new file mode 100644 index 00000000..cdebe3d8 --- /dev/null +++ b/tests/temporal/Workflow/WithChildStubWorkflow.php @@ -0,0 +1,20 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class WithChildStubWorkflow +{ + #[WorkflowMethod(name: 'WithChildStubWorkflow')] + public function handler(string $input): iterable + { + $child = Workflow::newChildWorkflowStub(SimpleWorkflow::class); + + return 'Child: ' . (yield $child->handler('child ' . $input)); + } +} diff --git a/tests/temporal/Workflow/WithChildWorkflow.php b/tests/temporal/Workflow/WithChildWorkflow.php new file mode 100644 index 00000000..aac0979b --- /dev/null +++ b/tests/temporal/Workflow/WithChildWorkflow.php @@ -0,0 +1,25 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class WithChildWorkflow +{ + #[WorkflowMethod(name: 'WithChildWorkflow')] + public function handler( + string $input + ): iterable { + $result = yield Workflow::executeChildWorkflow( + 'SimpleWorkflow', + ['child ' . $input], + Workflow\ChildWorkflowOptions::new() + ); + + return 'Child: ' . $result; + } +} diff --git a/tests/temporal/Workflow/WorkflowWithSequence.php b/tests/temporal/Workflow/WorkflowWithSequence.php new file mode 100644 index 00000000..9e813a9c --- /dev/null +++ b/tests/temporal/Workflow/WorkflowWithSequence.php @@ -0,0 +1,30 @@ +<?php + + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class WorkflowWithSequence +{ + #[WorkflowMethod(name: 'WorkflowWithSequence')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $a = $simple->echo('a'); + $b = $simple->echo('b'); + + yield $a; + yield $b; + + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/WorkflowWithSignalledSteps.php b/tests/temporal/Workflow/WorkflowWithSignalledSteps.php new file mode 100644 index 00000000..5f1af766 --- /dev/null +++ b/tests/temporal/Workflow/WorkflowWithSignalledSteps.php @@ -0,0 +1,51 @@ +<?php + + +namespace Temporal\Tests\Workflow; + +use React\Promise\Deferred; +use React\Promise\PromiseInterface; +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class WorkflowWithSignalledSteps +{ + #[WorkflowMethod(name: 'WorkflowWithSignalledSteps')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $value = 0; + Workflow::registerQuery('value', function () use (&$value) { + return $value; + }); + + yield $this->promiseSignal('begin'); + $value++; + + yield $this->promiseSignal('next1'); + $value++; + + yield $this->promiseSignal('next2'); + $value++; + + return $value; + } + + // is this correct? + private function promiseSignal(string $name): PromiseInterface + { + $signal = new Deferred(); + Workflow::registerSignal($name, function ($value) use ($signal) { + $signal->resolve($value); + }); + + return $signal->promise(); + } +} |