summaryrefslogtreecommitdiff
path: root/tests/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-27 13:56:28 +0300
committerValery Piashchynski <[email protected]>2021-01-27 13:56:28 +0300
commit744c2b0c86b88f77e681f8660bf3a476e83711b8 (patch)
treef7af7d7d494d1f5ca272af1ad0b978fe44d685a9 /tests/plugins
parente2266b80db47444ba5858c736833a8a81b1361ad (diff)
Move temporal plugin to the temporal repository
Diffstat (limited to 'tests/plugins')
-rw-r--r--tests/plugins/temporal/.rr.yaml22
-rw-r--r--tests/plugins/temporal/cancel_test.go291
-rw-r--r--tests/plugins/temporal/child_test.go84
-rw-r--r--tests/plugins/temporal/disaster_test.go114
-rw-r--r--tests/plugins/temporal/hp_test.go408
-rw-r--r--tests/plugins/temporal/query_test.go66
-rw-r--r--tests/plugins/temporal/server_test.go198
-rw-r--r--tests/plugins/temporal/signal_test.go170
-rw-r--r--tests/plugins/temporal/worker.php33
9 files changed, 0 insertions, 1386 deletions
diff --git a/tests/plugins/temporal/.rr.yaml b/tests/plugins/temporal/.rr.yaml
deleted file mode 100644
index 04d0730d..00000000
--- a/tests/plugins/temporal/.rr.yaml
+++ /dev/null
@@ -1,22 +0,0 @@
-# Application configuration
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php worker.php"
-
-# Workflow and activity mesh service
-temporal:
- address: "localhost:7233"
- activities:
- num_workers: 4
- codec: json
- debug_level: 2
-
-logs:
- mode: none
- channels:
- activities:
- mode: none
- #workflows:
- #mode: none \ No newline at end of file
diff --git a/tests/plugins/temporal/cancel_test.go b/tests/plugins/temporal/cancel_test.go
deleted file mode 100644
index 0fd3c126..00000000
--- a/tests/plugins/temporal/cancel_test.go
+++ /dev/null
@@ -1,291 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/enums/v1"
- "go.temporal.io/api/history/v1"
- "go.temporal.io/sdk/client"
-)
-
-func Test_SimpleWorkflowCancel(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleSignalledWorkflow")
- assert.NoError(t, err)
-
- time.Sleep(time.Millisecond * 500)
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.Error(t, w.Get(context.Background(), &result))
-
- we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- assert.Equal(t, "Canceled", we.WorkflowExecutionInfo.Status.String())
-}
-
-func Test_CancellableWorkflowScope(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledScopeWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "yes", result)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_TIMER_CANCELED
- })
-
- s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
- })
-}
-
-func Test_CancelledWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "CANCELLED", result)
-}
-
-func Test_CancelledWithCompensationWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledWithCompensationWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "yield",
- "rollback",
- "captured retry",
- "captured promise on cancelled",
- "START rollback",
- "WAIT ROLLBACK",
- "RESULT (ROLLBACK)", "DONE rollback",
- "COMPLETE rollback",
- "result: OK",
- },
- trace,
- )
-}
-
-func Test_CancelledNestedWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledNestedWorkflow",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "CANCELLED", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "begin",
- "first scope",
- "second scope",
- "close second scope",
- "close first scope",
- "second scope cancelled",
- "first scope cancelled",
- "close process",
- },
- trace,
- )
-}
-
-func Test_CancelledNSingleScopeWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledSingleScopeWorkflow",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "start",
- "in scope",
- "on cancel",
- "captured in scope",
- "captured in process",
- },
- trace,
- )
-}
-
-func Test_CancelledMidflightWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledMidflightWorkflow",
- )
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "start",
- "in scope",
- "on cancel",
- "done cancel",
- },
- trace,
- )
-
- s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
- })
-}
-
-func Test_CancelSignalledChildWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelSignalledChildWorkflow",
- )
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "cancelled ok", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "start",
- "child started",
- "child signalled",
- "scope cancelled",
- "process done",
- },
- trace,
- )
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED
- })
-}
diff --git a/tests/plugins/temporal/child_test.go b/tests/plugins/temporal/child_test.go
deleted file mode 100644
index 49521791..00000000
--- a/tests/plugins/temporal/child_test.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/sdk/client"
-)
-
-func Test_ExecuteChildWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "WithChildWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "Child: CHILD HELLO WORLD", result)
-}
-
-func Test_ExecuteChildStubWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "WithChildStubWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "Child: CHILD HELLO WORLD", result)
-}
-
-func Test_ExecuteChildStubWorkflow_02(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ChildStubWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result []string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, []string{"HELLO WORLD", "UNTYPED"}, result)
-}
-
-func Test_SignalChildViaStubWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SignalChildViaStubWorkflow",
- )
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, 8, result)
-}
diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go
deleted file mode 100644
index 9ca4d018..00000000
--- a/tests/plugins/temporal/disaster_test.go
+++ /dev/null
@@ -1,114 +0,0 @@
-package tests
-
-import (
- "context"
- "os"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/sdk/client"
-)
-
-func Test_WorkerError_DisasterRecovery(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
- assert.NoError(t, err)
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "TimerWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Millisecond * 750)
-
- // must fully recover with new worker
- assert.NoError(t, p.Kill())
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "hello world", result)
-}
-
-func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- defer func() {
- // always restore script
- _ = os.Rename("worker.bak", "worker.php")
- }()
-
- // Makes worker pool unable to recover for some time
- _ = os.Rename("worker.php", "worker.bak")
-
- p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
- assert.NoError(t, err)
-
- // must fully recover with new worker
- assert.NoError(t, p.Kill())
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "TimerWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Millisecond * 750)
-
- // restore the script and recover activity pool
- _ = os.Rename("worker.bak", "worker.php")
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "hello world", result)
-}
-
-func Test_ActivityError_DisasterRecovery(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- defer func() {
- // always restore script
- _ = os.Rename("worker.bak", "worker.php")
- }()
-
- // Makes worker pool unable to recover for some time
- _ = os.Rename("worker.php", "worker.bak")
-
- // destroys all workers in activities
- for _, wrk := range s.activities.Workers() {
- assert.NoError(t, wrk.Kill())
- }
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- // activity can't complete at this moment
- time.Sleep(time.Millisecond * 750)
-
- // restore the script and recover activity pool
- _ = os.Rename("worker.bak", "worker.php")
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "HELLO WORLD", result)
-}
diff --git a/tests/plugins/temporal/hp_test.go b/tests/plugins/temporal/hp_test.go
deleted file mode 100644
index bceac025..00000000
--- a/tests/plugins/temporal/hp_test.go
+++ /dev/null
@@ -1,408 +0,0 @@
-package tests
-
-import (
- "context"
- "crypto/rand"
- "crypto/sha512"
- "fmt"
- "testing"
- "time"
-
- "go.temporal.io/api/common/v1"
-
- "github.com/fatih/color"
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/enums/v1"
- "go.temporal.io/api/history/v1"
- "go.temporal.io/sdk/client"
-)
-
-func init() {
- color.NoColor = false
-}
-
-func Test_VerifyRegistration(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- assert.Contains(t, s.workflows.WorkflowNames(), "SimpleWorkflow")
-
- assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.echo")
- assert.Contains(t, s.activities.ActivityNames(), "HeartBeatActivity.doSomething")
-
- assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.lower")
-}
-
-func Test_ExecuteSimpleWorkflow_1(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "HELLO WORLD", result)
-}
-
-type User struct {
- Name string
- Email string
-}
-
-func Test_ExecuteSimpleDTOWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleDTOWorkflow",
- User{
- Name: "Antony",
- Email: "[email protected]",
- },
- )
- assert.NoError(t, err)
-
- var result struct{ Message string }
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "Hello Antony <[email protected]>", result.Message)
-}
-
-func Test_ExecuteSimpleWorkflowWithSequenceInBatch(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "WorkflowWithSequence",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-}
-
-func Test_MultipleWorkflowsInSingleWorker(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- w2, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "TimerWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "HELLO WORLD", result)
-
- assert.NoError(t, w2.Get(context.Background(), &result))
- assert.Equal(t, "hello world", result)
-}
-
-func Test_ExecuteWorkflowWithParallelScopes(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ParallelScopesWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "HELLO WORLD|Hello World|hello world", result)
-}
-
-func Test_Timer(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- start := time.Now()
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "TimerWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "hello world", result)
- assert.True(t, time.Since(start).Seconds() > 1)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_TIMER_STARTED
- })
-}
-
-func Test_SideEffect(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SideEffectWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Contains(t, result, "hello world-")
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_MARKER_RECORDED
- })
-}
-
-func Test_EmptyWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "EmptyWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, 42, result)
-}
-
-func Test_PromiseChaining(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ChainedWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "result:hello world", result)
-}
-
-func Test_ActivityHeartbeat(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleHeartbeatWorkflow",
- 2,
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
- assert.Len(t, we.PendingActivities, 1)
-
- act := we.PendingActivities[0]
-
- assert.Equal(t, `{"value":2}`, string(act.HeartbeatDetails.Payloads[0].Data))
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-}
-
-func Test_FailedActivityHeartbeat(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "FailedHeartbeatWorkflow",
- 8,
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
- assert.Len(t, we.PendingActivities, 1)
-
- act := we.PendingActivities[0]
-
- assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data))
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK!", result)
-}
-
-func Test_BinaryPayload(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- rnd := make([]byte, 2500)
-
- _, err := rand.Read(rnd)
- assert.NoError(t, err)
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "BinaryWorkflow",
- rnd,
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
-
- assert.Equal(t, fmt.Sprintf("%x", sha512.Sum512(rnd)), result)
-}
-
-func Test_ContinueAsNew(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ContinuableWorkflow",
- 1,
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- assert.Equal(t, "ContinuedAsNew", we.WorkflowExecutionInfo.Status.String())
-
- time.Sleep(time.Second)
-
- // the result of the final workflow
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK6", result)
-}
-
-func Test_ActivityStubWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ActivityStubWorkflow",
- "hello world",
- )
- assert.NoError(t, err)
-
- // the result of the final workflow
- var result []string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, []string{
- "HELLO WORLD",
- "invalid method call",
- "UNTYPED",
- }, result)
-}
-
-func Test_ExecuteProtoWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ProtoPayloadWorkflow",
- )
- assert.NoError(t, err)
-
- var result common.WorkflowExecution
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "updated", result.RunId)
- assert.Equal(t, "workflow id", result.WorkflowId)
-}
-
-func Test_SagaWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SagaWorkflow",
- )
- assert.NoError(t, err)
-
- var result string
- assert.Error(t, w.Get(context.Background(), &result))
-}
diff --git a/tests/plugins/temporal/query_test.go b/tests/plugins/temporal/query_test.go
deleted file mode 100644
index 8b0caeee..00000000
--- a/tests/plugins/temporal/query_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/sdk/client"
-)
-
-func Test_ListQueries(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "QueryWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Millisecond * 500)
-
- v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "error", -1)
- assert.Nil(t, v)
- assert.Error(t, err)
-
- assert.Contains(t, err.Error(), "KnownQueryTypes=[get]")
-
- var r int
- assert.NoError(t, w.Get(context.Background(), &r))
- assert.Equal(t, 0, r)
-}
-
-func Test_GetQuery(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "QueryWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", 88)
- assert.NoError(t, err)
- time.Sleep(time.Millisecond * 500)
-
- v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "get", nil)
- assert.NoError(t, err)
-
- var r int
- assert.NoError(t, v.Get(&r))
- assert.Equal(t, 88, r)
-
- assert.NoError(t, w.Get(context.Background(), &r))
- assert.Equal(t, 88, r)
-}
diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go
deleted file mode 100644
index c8d815c3..00000000
--- a/tests/plugins/temporal/server_test.go
+++ /dev/null
@@ -1,198 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
-
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/informer"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/resetter"
- "github.com/spiral/roadrunner/v2/plugins/rpc"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/activity"
- rrClient "github.com/spiral/roadrunner/v2/plugins/temporal/client"
- "github.com/spiral/roadrunner/v2/plugins/temporal/workflow"
- "go.temporal.io/api/enums/v1"
- "go.temporal.io/api/history/v1"
- temporalClient "go.temporal.io/sdk/client"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
-)
-
-type TestServer struct {
- container endure.Container
- temporal rrClient.Temporal
- activities *activity.Plugin
- workflows *workflow.Plugin
-}
-
-type ConfigOption struct {
- Name string
- Value interface{}
-}
-
-func NewOption(name string, value interface{}) ConfigOption {
- return ConfigOption{Name: name, Value: value}
-}
-
-func NewTestServer(opt ...ConfigOption) *TestServer {
- e, err := endure.NewContainer(initLogger(), endure.RetryOnFail(false))
- if err != nil {
- panic(err)
- }
-
- t := &rrClient.Plugin{}
- a := &activity.Plugin{}
- w := &workflow.Plugin{}
-
- if err := e.Register(initConfig(opt...)); err != nil {
- panic(err)
- }
-
- if err := e.Register(&logger.ZapLogger{}); err != nil {
- panic(err)
- }
- if err := e.Register(&resetter.Plugin{}); err != nil {
- panic(err)
- }
- if err := e.Register(&informer.Plugin{}); err != nil {
- panic(err)
- }
- if err := e.Register(&server.Plugin{}); err != nil {
- panic(err)
- }
- if err := e.Register(&rpc.Plugin{}); err != nil {
- panic(err)
- }
-
- if err := e.Register(t); err != nil {
- panic(err)
- }
- if err := e.Register(a); err != nil {
- panic(err)
- }
- if err := e.Register(w); err != nil {
- panic(err)
- }
-
- if err := e.Init(); err != nil {
- panic(err)
- }
-
- errCh, err := e.Serve()
- if err != nil {
- panic(err)
- }
-
- go func() {
- err := <-errCh
- er := e.Stop()
- if er != nil {
- panic(err)
- }
- }()
-
- return &TestServer{container: e, temporal: t, activities: a, workflows: w}
-}
-
-func (s *TestServer) Client() temporalClient.Client {
- return s.temporal.GetClient()
-}
-
-func (s *TestServer) MustClose() {
- err := s.container.Stop()
- if err != nil {
- panic(err)
- }
-}
-
-func initConfig(opt ...ConfigOption) config.Configurer {
- cfg := &config.Viper{}
- cfg.Path = ".rr.yaml"
- cfg.Prefix = "rr"
-
- return cfg
-}
-
-func initLogger() *zap.Logger {
- cfg := zap.Config{
- Level: zap.NewAtomicLevelAt(zap.ErrorLevel),
- Encoding: "console",
- EncoderConfig: zapcore.EncoderConfig{
- MessageKey: "message",
- LevelKey: "level",
- TimeKey: "time",
- CallerKey: "caller",
- NameKey: "name",
- StacktraceKey: "stack",
- EncodeLevel: zapcore.CapitalLevelEncoder,
- EncodeTime: zapcore.ISO8601TimeEncoder,
- EncodeCaller: zapcore.ShortCallerEncoder,
- },
- OutputPaths: []string{"stderr"},
- ErrorOutputPaths: []string{"stderr"},
- }
-
- l, err := cfg.Build(zap.AddCaller())
- if err != nil {
- panic(err)
- }
-
- return l
-}
-
-func (s *TestServer) AssertContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) {
- i := s.Client().GetWorkflowHistory(
- context.Background(),
- w.GetID(),
- w.GetRunID(),
- false,
- enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
- )
-
- for {
- if !i.HasNext() {
- t.Error("no more events and no match found")
- break
- }
-
- e, err := i.Next()
- if err != nil {
- t.Error("unable to read history event")
- break
- }
-
- if assert(e) {
- break
- }
- }
-}
-
-func (s *TestServer) AssertNotContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) {
- i := s.Client().GetWorkflowHistory(
- context.Background(),
- w.GetID(),
- w.GetRunID(),
- false,
- enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
- )
-
- for {
- if !i.HasNext() {
- break
- }
-
- e, err := i.Next()
- if err != nil {
- t.Error("unable to read history event")
- break
- }
-
- if assert(e) {
- t.Error("found unexpected event")
- break
- }
- }
-}
diff --git a/tests/plugins/temporal/signal_test.go b/tests/plugins/temporal/signal_test.go
deleted file mode 100644
index 51826287..00000000
--- a/tests/plugins/temporal/signal_test.go
+++ /dev/null
@@ -1,170 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
- "time"
-
- "github.com/pborman/uuid"
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/enums/v1"
- "go.temporal.io/api/history/v1"
- "go.temporal.io/sdk/client"
-)
-
-func Test_SignalsWithoutSignals(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleSignalledWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, 0, result)
-}
-
-func Test_SendSignalDuringTimer(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().SignalWithStartWorkflow(
- context.Background(),
- "signalled-"+uuid.New(),
- "add",
- 10,
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleSignalledWorkflow",
- )
- assert.NoError(t, err)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1)
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, 9, result)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
- attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
- return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
- }
-
- return false
- })
-}
-
-func Test_SendSignalBeforeCompletingWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleSignalledWorkflowWithSleep",
- )
- assert.NoError(t, err)
-
- // should be around sleep(1) call
- time.Sleep(time.Second + time.Millisecond*200)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1)
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, -1, result)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
- attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
- return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
- }
-
- return false
- })
-}
-
-func Test_RuntimeSignal(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().SignalWithStartWorkflow(
- context.Background(),
- "signalled-"+uuid.New(),
- "add",
- -1,
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "RuntimeSignalWorkflow",
- )
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, -1, result)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
- attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
- return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
- }
-
- return false
- })
-}
-
-func Test_SignalSteps(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "WorkflowWithSignalledSteps",
- )
- assert.NoError(t, err)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "begin", true)
- assert.NoError(t, err)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next1", true)
- assert.NoError(t, err)
-
- v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil)
- assert.NoError(t, err)
-
- var r int
- assert.NoError(t, v.Get(&r))
- assert.Equal(t, 2, r)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next2", true)
- assert.NoError(t, err)
-
- v, err = s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil)
- assert.NoError(t, err)
-
- assert.NoError(t, v.Get(&r))
- assert.Equal(t, 3, r)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
-
- // 3 ticks
- assert.Equal(t, 3, result)
-}
diff --git a/tests/plugins/temporal/worker.php b/tests/plugins/temporal/worker.php
deleted file mode 100644
index 0d0263e7..00000000
--- a/tests/plugins/temporal/worker.php
+++ /dev/null
@@ -1,33 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-require __DIR__ . '/../../vendor/autoload.php';
-
-/**
- * @param string $dir
- * @return array<string>
- */
-$getClasses = static function (string $dir): iterable {
- $files = glob($dir . '/*.php');
-
- foreach ($files as $file) {
- yield substr(basename($file), 0, -4);
- }
-};
-
-$factory = \Temporal\WorkerFactory::create();
-
-$worker = $factory->newWorker('default');
-
-// register all workflows
-foreach ($getClasses(__DIR__ . '/../../temporal/Workflow') as $name) {
- $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name);
-}
-
-// register all activity
-foreach ($getClasses(__DIR__ . '/../../temporal/Activity') as $name) {
- $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name);
-}
-
-$factory->run();