summaryrefslogtreecommitdiff
path: root/tests/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-25 22:47:02 +0300
committerValery Piashchynski <[email protected]>2021-01-25 22:47:02 +0300
commit43071e43a0743ff8c7913bba7819952962124355 (patch)
treee3b61113d3c0d28f972c71592af8b2f708994167 /tests/plugins
parent5fd1168c687040ca7d72f4727ee1aec753d3f258 (diff)
Initial commit of the Temporal plugins set
Diffstat (limited to 'tests/plugins')
-rw-r--r--tests/plugins/http/handler_test.go8
-rw-r--r--tests/plugins/temporal/.rr.yaml22
-rw-r--r--tests/plugins/temporal/cancel_test.go291
-rw-r--r--tests/plugins/temporal/child_test.go84
-rw-r--r--tests/plugins/temporal/disaster_test.go114
-rw-r--r--tests/plugins/temporal/hp_test.go408
-rw-r--r--tests/plugins/temporal/query_test.go66
-rw-r--r--tests/plugins/temporal/server_test.go203
-rw-r--r--tests/plugins/temporal/signal_test.go170
-rw-r--r--tests/plugins/temporal/worker.php33
10 files changed, 1395 insertions, 4 deletions
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index e47dbd44..fc672e36 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -156,7 +156,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
}, nil, p)
assert.NoError(t, err)
- hs := &http.Server{Addr: ":8088", Handler: h}
+ hs := &http.Server{Addr: ":19658", Handler: h}
defer func() {
err := hs.Shutdown(context.Background())
if err != nil {
@@ -172,7 +172,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
}()
time.Sleep(time.Millisecond * 10)
- req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
+ req, err := http.NewRequest("GET", "http://localhost:19658?hello=world", nil)
assert.NoError(t, err)
req.Header.Add("user-agent", "")
@@ -216,7 +216,7 @@ func TestHandler_User_Agent(t *testing.T) {
}, nil, p)
assert.NoError(t, err)
- hs := &http.Server{Addr: ":8088", Handler: h}
+ hs := &http.Server{Addr: ":25688", Handler: h}
defer func() {
err := hs.Shutdown(context.Background())
if err != nil {
@@ -232,7 +232,7 @@ func TestHandler_User_Agent(t *testing.T) {
}()
time.Sleep(time.Millisecond * 10)
- req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
+ req, err := http.NewRequest("GET", "http://localhost:25688?hello=world", nil)
assert.NoError(t, err)
req.Header.Add("User-Agent", "go-agent")
diff --git a/tests/plugins/temporal/.rr.yaml b/tests/plugins/temporal/.rr.yaml
new file mode 100644
index 00000000..04d0730d
--- /dev/null
+++ b/tests/plugins/temporal/.rr.yaml
@@ -0,0 +1,22 @@
+# Application configuration
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php worker.php"
+
+# Workflow and activity mesh service
+temporal:
+ address: "localhost:7233"
+ activities:
+ num_workers: 4
+ codec: json
+ debug_level: 2
+
+logs:
+ mode: none
+ channels:
+ activities:
+ mode: none
+ #workflows:
+ #mode: none \ No newline at end of file
diff --git a/tests/plugins/temporal/cancel_test.go b/tests/plugins/temporal/cancel_test.go
new file mode 100644
index 00000000..0fd3c126
--- /dev/null
+++ b/tests/plugins/temporal/cancel_test.go
@@ -0,0 +1,291 @@
+package tests
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/enums/v1"
+ "go.temporal.io/api/history/v1"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_SimpleWorkflowCancel(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleSignalledWorkflow")
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 500)
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.Error(t, w.Get(context.Background(), &result))
+
+ we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ assert.Equal(t, "Canceled", we.WorkflowExecutionInfo.Status.String())
+}
+
+func Test_CancellableWorkflowScope(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledScopeWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "yes", result)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_TIMER_CANCELED
+ })
+
+ s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
+ })
+}
+
+func Test_CancelledWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "CANCELLED", result)
+}
+
+func Test_CancelledWithCompensationWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledWithCompensationWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "yield",
+ "rollback",
+ "captured retry",
+ "captured promise on cancelled",
+ "START rollback",
+ "WAIT ROLLBACK",
+ "RESULT (ROLLBACK)", "DONE rollback",
+ "COMPLETE rollback",
+ "result: OK",
+ },
+ trace,
+ )
+}
+
+func Test_CancelledNestedWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledNestedWorkflow",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "CANCELLED", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "begin",
+ "first scope",
+ "second scope",
+ "close second scope",
+ "close first scope",
+ "second scope cancelled",
+ "first scope cancelled",
+ "close process",
+ },
+ trace,
+ )
+}
+
+func Test_CancelledNSingleScopeWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledSingleScopeWorkflow",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "start",
+ "in scope",
+ "on cancel",
+ "captured in scope",
+ "captured in process",
+ },
+ trace,
+ )
+}
+
+func Test_CancelledMidflightWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledMidflightWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "start",
+ "in scope",
+ "on cancel",
+ "done cancel",
+ },
+ trace,
+ )
+
+ s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
+ })
+}
+
+func Test_CancelSignalledChildWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelSignalledChildWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "cancelled ok", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "start",
+ "child started",
+ "child signalled",
+ "scope cancelled",
+ "process done",
+ },
+ trace,
+ )
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED
+ })
+}
diff --git a/tests/plugins/temporal/child_test.go b/tests/plugins/temporal/child_test.go
new file mode 100644
index 00000000..49521791
--- /dev/null
+++ b/tests/plugins/temporal/child_test.go
@@ -0,0 +1,84 @@
+package tests
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_ExecuteChildWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "WithChildWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "Child: CHILD HELLO WORLD", result)
+}
+
+func Test_ExecuteChildStubWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "WithChildStubWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "Child: CHILD HELLO WORLD", result)
+}
+
+func Test_ExecuteChildStubWorkflow_02(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ChildStubWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result []string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, []string{"HELLO WORLD", "UNTYPED"}, result)
+}
+
+func Test_SignalChildViaStubWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SignalChildViaStubWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, 8, result)
+}
diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go
new file mode 100644
index 00000000..9ca4d018
--- /dev/null
+++ b/tests/plugins/temporal/disaster_test.go
@@ -0,0 +1,114 @@
+package tests
+
+import (
+ "context"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_WorkerError_DisasterRecovery(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
+ assert.NoError(t, err)
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 750)
+
+ // must fully recover with new worker
+ assert.NoError(t, p.Kill())
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+}
+
+func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ defer func() {
+ // always restore script
+ _ = os.Rename("worker.bak", "worker.php")
+ }()
+
+ // Makes worker pool unable to recover for some time
+ _ = os.Rename("worker.php", "worker.bak")
+
+ p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
+ assert.NoError(t, err)
+
+ // must fully recover with new worker
+ assert.NoError(t, p.Kill())
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 750)
+
+ // restore the script and recover activity pool
+ _ = os.Rename("worker.bak", "worker.php")
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+}
+
+func Test_ActivityError_DisasterRecovery(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ defer func() {
+ // always restore script
+ _ = os.Rename("worker.bak", "worker.php")
+ }()
+
+ // Makes worker pool unable to recover for some time
+ _ = os.Rename("worker.php", "worker.bak")
+
+ // destroys all workers in activities
+ for _, wrk := range s.activities.Workers() {
+ assert.NoError(t, wrk.Kill())
+ }
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ // activity can't complete at this moment
+ time.Sleep(time.Millisecond * 750)
+
+ // restore the script and recover activity pool
+ _ = os.Rename("worker.bak", "worker.php")
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "HELLO WORLD", result)
+}
diff --git a/tests/plugins/temporal/hp_test.go b/tests/plugins/temporal/hp_test.go
new file mode 100644
index 00000000..bceac025
--- /dev/null
+++ b/tests/plugins/temporal/hp_test.go
@@ -0,0 +1,408 @@
+package tests
+
+import (
+ "context"
+ "crypto/rand"
+ "crypto/sha512"
+ "fmt"
+ "testing"
+ "time"
+
+ "go.temporal.io/api/common/v1"
+
+ "github.com/fatih/color"
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/enums/v1"
+ "go.temporal.io/api/history/v1"
+ "go.temporal.io/sdk/client"
+)
+
+func init() {
+ color.NoColor = false
+}
+
+func Test_VerifyRegistration(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ assert.Contains(t, s.workflows.WorkflowNames(), "SimpleWorkflow")
+
+ assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.echo")
+ assert.Contains(t, s.activities.ActivityNames(), "HeartBeatActivity.doSomething")
+
+ assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.lower")
+}
+
+func Test_ExecuteSimpleWorkflow_1(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "HELLO WORLD", result)
+}
+
+type User struct {
+ Name string
+ Email string
+}
+
+func Test_ExecuteSimpleDTOWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleDTOWorkflow",
+ User{
+ Name: "Antony",
+ Email: "[email protected]",
+ },
+ )
+ assert.NoError(t, err)
+
+ var result struct{ Message string }
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "Hello Antony <[email protected]>", result.Message)
+}
+
+func Test_ExecuteSimpleWorkflowWithSequenceInBatch(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "WorkflowWithSequence",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+}
+
+func Test_MultipleWorkflowsInSingleWorker(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ w2, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "HELLO WORLD", result)
+
+ assert.NoError(t, w2.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+}
+
+func Test_ExecuteWorkflowWithParallelScopes(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ParallelScopesWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "HELLO WORLD|Hello World|hello world", result)
+}
+
+func Test_Timer(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ start := time.Now()
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+ assert.True(t, time.Since(start).Seconds() > 1)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_TIMER_STARTED
+ })
+}
+
+func Test_SideEffect(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SideEffectWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Contains(t, result, "hello world-")
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_MARKER_RECORDED
+ })
+}
+
+func Test_EmptyWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "EmptyWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, 42, result)
+}
+
+func Test_PromiseChaining(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ChainedWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "result:hello world", result)
+}
+
+func Test_ActivityHeartbeat(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleHeartbeatWorkflow",
+ 2,
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+ assert.Len(t, we.PendingActivities, 1)
+
+ act := we.PendingActivities[0]
+
+ assert.Equal(t, `{"value":2}`, string(act.HeartbeatDetails.Payloads[0].Data))
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+}
+
+func Test_FailedActivityHeartbeat(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "FailedHeartbeatWorkflow",
+ 8,
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+ assert.Len(t, we.PendingActivities, 1)
+
+ act := we.PendingActivities[0]
+
+ assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data))
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK!", result)
+}
+
+func Test_BinaryPayload(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ rnd := make([]byte, 2500)
+
+ _, err := rand.Read(rnd)
+ assert.NoError(t, err)
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "BinaryWorkflow",
+ rnd,
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+
+ assert.Equal(t, fmt.Sprintf("%x", sha512.Sum512(rnd)), result)
+}
+
+func Test_ContinueAsNew(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ContinuableWorkflow",
+ 1,
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ assert.Equal(t, "ContinuedAsNew", we.WorkflowExecutionInfo.Status.String())
+
+ time.Sleep(time.Second)
+
+ // the result of the final workflow
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK6", result)
+}
+
+func Test_ActivityStubWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ActivityStubWorkflow",
+ "hello world",
+ )
+ assert.NoError(t, err)
+
+ // the result of the final workflow
+ var result []string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, []string{
+ "HELLO WORLD",
+ "invalid method call",
+ "UNTYPED",
+ }, result)
+}
+
+func Test_ExecuteProtoWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ProtoPayloadWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result common.WorkflowExecution
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "updated", result.RunId)
+ assert.Equal(t, "workflow id", result.WorkflowId)
+}
+
+func Test_SagaWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SagaWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.Error(t, w.Get(context.Background(), &result))
+}
diff --git a/tests/plugins/temporal/query_test.go b/tests/plugins/temporal/query_test.go
new file mode 100644
index 00000000..8b0caeee
--- /dev/null
+++ b/tests/plugins/temporal/query_test.go
@@ -0,0 +1,66 @@
+package tests
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_ListQueries(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "QueryWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 500)
+
+ v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "error", -1)
+ assert.Nil(t, v)
+ assert.Error(t, err)
+
+ assert.Contains(t, err.Error(), "KnownQueryTypes=[get]")
+
+ var r int
+ assert.NoError(t, w.Get(context.Background(), &r))
+ assert.Equal(t, 0, r)
+}
+
+func Test_GetQuery(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "QueryWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", 88)
+ assert.NoError(t, err)
+ time.Sleep(time.Millisecond * 500)
+
+ v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "get", nil)
+ assert.NoError(t, err)
+
+ var r int
+ assert.NoError(t, v.Get(&r))
+ assert.Equal(t, 88, r)
+
+ assert.NoError(t, w.Get(context.Background(), &r))
+ assert.Equal(t, 88, r)
+}
diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go
new file mode 100644
index 00000000..03e4ed7f
--- /dev/null
+++ b/tests/plugins/temporal/server_test.go
@@ -0,0 +1,203 @@
+package tests
+
+import (
+ "context"
+ "testing"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/informer"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/resetter"
+ "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/activity"
+ rrClient "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/workflow"
+ "go.temporal.io/api/enums/v1"
+ "go.temporal.io/api/history/v1"
+ temporalClient "go.temporal.io/sdk/client"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+)
+
+type TestServer struct {
+ container endure.Container
+ temporal rrClient.Temporal
+ activities *activity.Plugin
+ workflows *workflow.Plugin
+}
+
+type ConfigOption struct {
+ Name string
+ Value interface{}
+}
+
+func NewOption(name string, value interface{}) ConfigOption {
+ return ConfigOption{Name: name, Value: value}
+}
+
+func NewTestServer(opt ...ConfigOption) *TestServer {
+ e, err := endure.NewContainer(initLogger(), endure.RetryOnFail(false))
+ if err != nil {
+ panic(err)
+ }
+
+ t := &rrClient.Plugin{}
+ a := &activity.Plugin{}
+ w := &workflow.Plugin{}
+
+ if err := e.Register(initConfig(opt...)); err != nil {
+ panic(err)
+ }
+
+ if err := e.Register(&logger.ZapLogger{}); err != nil {
+ panic(err)
+ }
+ if err := e.Register(&resetter.Plugin{}); err != nil {
+ panic(err)
+ }
+ if err := e.Register(&informer.Plugin{}); err != nil {
+ panic(err)
+ }
+ if err := e.Register(&server.Plugin{}); err != nil {
+ panic(err)
+ }
+ if err := e.Register(&rpc.Plugin{}); err != nil {
+ panic(err)
+ }
+
+ if err := e.Register(t); err != nil {
+ panic(err)
+ }
+ if err := e.Register(a); err != nil {
+ panic(err)
+ }
+ if err := e.Register(w); err != nil {
+ panic(err)
+ }
+
+ if err := e.Init(); err != nil {
+ panic(err)
+ }
+
+ errCh, err := e.Serve()
+ if err != nil {
+ panic(err)
+ }
+
+ go func() {
+ err := <-errCh
+ er := e.Stop()
+ if er != nil {
+ panic(err)
+ }
+ }()
+
+ return &TestServer{container: e, temporal: t, activities: a, workflows: w}
+}
+
+func (s *TestServer) Client() temporalClient.Client {
+ c, err := s.temporal.GetClient()
+ if err != nil {
+ panic(err)
+ }
+
+ return c
+}
+
+func (s *TestServer) MustClose() {
+ err := s.container.Stop()
+ if err != nil {
+ panic(err)
+ }
+}
+
+func initConfig(opt ...ConfigOption) config.Configurer {
+ cfg := &config.Viper{}
+ cfg.Path = ".rr.yaml"
+ cfg.Prefix = "rr"
+
+ return cfg
+}
+
+func initLogger() *zap.Logger {
+ cfg := zap.Config{
+ Level: zap.NewAtomicLevelAt(zap.ErrorLevel),
+ Encoding: "console",
+ EncoderConfig: zapcore.EncoderConfig{
+ MessageKey: "message",
+ LevelKey: "level",
+ TimeKey: "time",
+ CallerKey: "caller",
+ NameKey: "name",
+ StacktraceKey: "stack",
+ EncodeLevel: zapcore.CapitalLevelEncoder,
+ EncodeTime: zapcore.ISO8601TimeEncoder,
+ EncodeCaller: zapcore.ShortCallerEncoder,
+ },
+ OutputPaths: []string{"stderr"},
+ ErrorOutputPaths: []string{"stderr"},
+ }
+
+ l, err := cfg.Build(zap.AddCaller())
+ if err != nil {
+ panic(err)
+ }
+
+ return l
+}
+
+func (s *TestServer) AssertContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) {
+ i := s.Client().GetWorkflowHistory(
+ context.Background(),
+ w.GetID(),
+ w.GetRunID(),
+ false,
+ enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
+ )
+
+ for {
+ if !i.HasNext() {
+ t.Error("no more events and no match found")
+ break
+ }
+
+ e, err := i.Next()
+ if err != nil {
+ t.Error("unable to read history event")
+ break
+ }
+
+ if assert(e) {
+ break
+ }
+ }
+}
+
+func (s *TestServer) AssertNotContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) {
+ i := s.Client().GetWorkflowHistory(
+ context.Background(),
+ w.GetID(),
+ w.GetRunID(),
+ false,
+ enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
+ )
+
+ for {
+ if !i.HasNext() {
+ break
+ }
+
+ e, err := i.Next()
+ if err != nil {
+ t.Error("unable to read history event")
+ break
+ }
+
+ if assert(e) {
+ t.Error("found unexpected event")
+ break
+ }
+ }
+}
diff --git a/tests/plugins/temporal/signal_test.go b/tests/plugins/temporal/signal_test.go
new file mode 100644
index 00000000..51826287
--- /dev/null
+++ b/tests/plugins/temporal/signal_test.go
@@ -0,0 +1,170 @@
+package tests
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/pborman/uuid"
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/enums/v1"
+ "go.temporal.io/api/history/v1"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_SignalsWithoutSignals(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleSignalledWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, 0, result)
+}
+
+func Test_SendSignalDuringTimer(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().SignalWithStartWorkflow(
+ context.Background(),
+ "signalled-"+uuid.New(),
+ "add",
+ 10,
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleSignalledWorkflow",
+ )
+ assert.NoError(t, err)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1)
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, 9, result)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
+ attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
+ return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
+ }
+
+ return false
+ })
+}
+
+func Test_SendSignalBeforeCompletingWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleSignalledWorkflowWithSleep",
+ )
+ assert.NoError(t, err)
+
+ // should be around sleep(1) call
+ time.Sleep(time.Second + time.Millisecond*200)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1)
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, -1, result)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
+ attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
+ return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
+ }
+
+ return false
+ })
+}
+
+func Test_RuntimeSignal(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().SignalWithStartWorkflow(
+ context.Background(),
+ "signalled-"+uuid.New(),
+ "add",
+ -1,
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "RuntimeSignalWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, -1, result)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
+ attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
+ return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
+ }
+
+ return false
+ })
+}
+
+func Test_SignalSteps(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "WorkflowWithSignalledSteps",
+ )
+ assert.NoError(t, err)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "begin", true)
+ assert.NoError(t, err)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next1", true)
+ assert.NoError(t, err)
+
+ v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil)
+ assert.NoError(t, err)
+
+ var r int
+ assert.NoError(t, v.Get(&r))
+ assert.Equal(t, 2, r)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next2", true)
+ assert.NoError(t, err)
+
+ v, err = s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil)
+ assert.NoError(t, err)
+
+ assert.NoError(t, v.Get(&r))
+ assert.Equal(t, 3, r)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+
+ // 3 ticks
+ assert.Equal(t, 3, result)
+}
diff --git a/tests/plugins/temporal/worker.php b/tests/plugins/temporal/worker.php
new file mode 100644
index 00000000..0d0263e7
--- /dev/null
+++ b/tests/plugins/temporal/worker.php
@@ -0,0 +1,33 @@
+<?php
+
+declare(strict_types=1);
+
+require __DIR__ . '/../../vendor/autoload.php';
+
+/**
+ * @param string $dir
+ * @return array<string>
+ */
+$getClasses = static function (string $dir): iterable {
+ $files = glob($dir . '/*.php');
+
+ foreach ($files as $file) {
+ yield substr(basename($file), 0, -4);
+ }
+};
+
+$factory = \Temporal\WorkerFactory::create();
+
+$worker = $factory->newWorker('default');
+
+// register all workflows
+foreach ($getClasses(__DIR__ . '/../../temporal/Workflow') as $name) {
+ $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name);
+}
+
+// register all activity
+foreach ($getClasses(__DIR__ . '/../../temporal/Activity') as $name) {
+ $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name);
+}
+
+$factory->run();