summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-26 11:52:03 +0300
committerGitHub <[email protected]>2021-01-26 11:52:03 +0300
commite2266b80db47444ba5858c736833a8a81b1361ad (patch)
tree37e06810352752f88032f7d0eadb554fa18b98da
parentfae4711e3548bfd2e34f13aabfaab6a5b4e317c6 (diff)
parenta392d962508e1bc9e497c8c4ef021425bc2c67c2 (diff)
Merge pull request #502 from spiral/plugin/temporalv2.0.0-beta12
plugin(temporal): Add temporal plugins set to the RR2
-rw-r--r--.github/workflows/linux.yml3
-rw-r--r--.github/workflows/macos.yml2
-rwxr-xr-x.rr.yaml8
-rwxr-xr-xMakefile113
-rw-r--r--cmd/main.go8
-rw-r--r--go.mod10
-rw-r--r--go.sum92
-rw-r--r--plugins/http/plugin.go4
-rw-r--r--plugins/temporal/activity/activity_pool.go197
-rw-r--r--plugins/temporal/activity/plugin.go215
-rw-r--r--plugins/temporal/activity/rpc.go66
-rw-r--r--plugins/temporal/client/doc/doc.go1
-rw-r--r--plugins/temporal/client/doc/temporal.drawio1
-rw-r--r--plugins/temporal/client/plugin.go169
-rw-r--r--plugins/temporal/protocol/converter.go76
-rw-r--r--plugins/temporal/protocol/converter_test.go23
-rw-r--r--plugins/temporal/protocol/internal/protocol.pb.go167
-rw-r--r--plugins/temporal/protocol/json_codec.go225
-rw-r--r--plugins/temporal/protocol/message.go334
-rw-r--r--plugins/temporal/protocol/proto_codec.go145
-rw-r--r--plugins/temporal/protocol/protocol.go77
-rw-r--r--plugins/temporal/protocol/worker_info.go72
-rw-r--r--plugins/temporal/workflow/canceller.go41
-rw-r--r--plugins/temporal/workflow/canceller_test.go33
-rw-r--r--plugins/temporal/workflow/id_registry.go51
-rw-r--r--plugins/temporal/workflow/message_queue.go47
-rw-r--r--plugins/temporal/workflow/message_queue_test.go53
-rw-r--r--plugins/temporal/workflow/plugin.go203
-rw-r--r--plugins/temporal/workflow/process.go436
-rw-r--r--plugins/temporal/workflow/workflow_pool.go190
-rw-r--r--tests/composer.json7
-rw-r--r--tests/docker-compose.yaml34
-rw-r--r--tests/plugins/http/handler_test.go8
-rw-r--r--tests/plugins/temporal/.rr.yaml22
-rw-r--r--tests/plugins/temporal/cancel_test.go291
-rw-r--r--tests/plugins/temporal/child_test.go84
-rw-r--r--tests/plugins/temporal/disaster_test.go114
-rw-r--r--tests/plugins/temporal/hp_test.go408
-rw-r--r--tests/plugins/temporal/query_test.go66
-rw-r--r--tests/plugins/temporal/server_test.go198
-rw-r--r--tests/plugins/temporal/signal_test.go170
-rw-r--r--tests/plugins/temporal/worker.php33
-rw-r--r--tests/psr-worker-bench.php62
-rw-r--r--tests/temporal/Activity/HeartBeatActivity.php58
-rw-r--r--tests/temporal/Activity/SimpleActivity.php63
-rw-r--r--tests/temporal/Client/StartNewWorkflow.php23
-rw-r--r--tests/temporal/DTO/Message.php14
-rw-r--r--tests/temporal/DTO/User.php15
-rw-r--r--tests/temporal/Workflow/ActivityStubWorkflow.php39
-rw-r--r--tests/temporal/Workflow/AggregatedWorkflow.php30
-rw-r--r--tests/temporal/Workflow/AsyncActivityWorkflow.php28
-rw-r--r--tests/temporal/Workflow/BinaryWorkflow.php21
-rw-r--r--tests/temporal/Workflow/CancelSignalledChildWorkflow.php57
-rw-r--r--tests/temporal/Workflow/CanceledHeartbeatWorkflow.php29
-rw-r--r--tests/temporal/Workflow/CancelledMidflightWorkflow.php47
-rw-r--r--tests/temporal/Workflow/CancelledNestedWorkflow.php72
-rw-r--r--tests/temporal/Workflow/CancelledScopeWorkflow.php39
-rw-r--r--tests/temporal/Workflow/CancelledSingleScopeWorkflow.php55
-rw-r--r--tests/temporal/Workflow/CancelledWithCompensationWorkflow.php79
-rw-r--r--tests/temporal/Workflow/CancelledWorkflow.php31
-rw-r--r--tests/temporal/Workflow/ChainedWorkflow.php31
-rw-r--r--tests/temporal/Workflow/ChildStubWorkflow.php30
-rw-r--r--tests/temporal/Workflow/ComplexExceptionalWorkflow.php26
-rw-r--r--tests/temporal/Workflow/ContinuableWorkflow.php38
-rw-r--r--tests/temporal/Workflow/EmptyWorkflow.php16
-rw-r--r--tests/temporal/Workflow/ExceptionalActivityWorkflow.php25
-rw-r--r--tests/temporal/Workflow/ExceptionalWorkflow.php18
-rw-r--r--tests/temporal/Workflow/FailedHeartbeatWorkflow.php30
-rw-r--r--tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php55
-rw-r--r--tests/temporal/Workflow/LoopWorkflow.php51
-rw-r--r--tests/temporal/Workflow/ParallelScopesWorkflow.php36
-rw-r--r--tests/temporal/Workflow/PeriodicWorkflow.php19
-rw-r--r--tests/temporal/Workflow/ProtoPayloadWorkflow.php33
-rw-r--r--tests/temporal/Workflow/QueryWorkflow.php41
-rw-r--r--tests/temporal/Workflow/RuntimeSignalWorkflow.php31
-rw-r--r--tests/temporal/Workflow/SagaWorkflow.php54
-rw-r--r--tests/temporal/Workflow/SideEffectWorkflow.php30
-rw-r--r--tests/temporal/Workflow/SignalChildViaStubWorkflow.php25
-rw-r--r--tests/temporal/Workflow/SimpleDTOWorkflow.php35
-rw-r--r--tests/temporal/Workflow/SimpleHeartbeatWorkflow.php25
-rw-r--r--tests/temporal/Workflow/SimpleSignalledWorkflow.php30
-rw-r--r--tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php34
-rw-r--r--tests/temporal/Workflow/SimpleWorkflow.php31
-rw-r--r--tests/temporal/Workflow/TimerWorkflow.php27
-rw-r--r--tests/temporal/Workflow/WaitWorkflow.php33
-rw-r--r--tests/temporal/Workflow/WithChildStubWorkflow.php20
-rw-r--r--tests/temporal/Workflow/WithChildWorkflow.php25
-rw-r--r--tests/temporal/Workflow/WorkflowWithSequence.php30
-rw-r--r--tests/temporal/Workflow/WorkflowWithSignalledSteps.php51
89 files changed, 6111 insertions, 78 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index 27df293a..076de74c 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -69,6 +69,9 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal.out -covermode=atomic ./tests/plugins/temporal
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_protocol.out -covermode=atomic ./plugins/temporal/protocol
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_workflow.out -covermode=atomic ./plugins/temporal/workflow
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml
index 77f9cfda..00b51598 100644
--- a/.github/workflows/macos.yml
+++ b/.github/workflows/macos.yml
@@ -67,6 +67,8 @@ jobs:
go test -v -race -tags=debug ./pkg/pool
go test -v -race -tags=debug ./pkg/worker
go test -v -race -tags=debug ./pkg/worker_watcher
+ go test -v -race -tags=debug ./plugins/temporal/protocol
+ go test -v -race -tags=debug ./plugins/temporal/workflow
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./tests/plugins/http
go test -v -race -tags=debug ./tests/plugins/informer
diff --git a/.rr.yaml b/.rr.yaml
index 06e452ce..1c1a0af2 100755
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -15,6 +15,14 @@ logs:
mode: development
level: error
+# Workflow and activity mesh service
+temporal:
+ address: localhost:7233
+ activities:
+ num_workers: 4
+ codec: json
+ debug_level: 2
+
http:
# host and port separated by semicolon
address: 127.0.0.1:44933
diff --git a/Makefile b/Makefile
index 0a39ef77..bfe915f5 100755
--- a/Makefile
+++ b/Makefile
@@ -32,6 +32,9 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pool.out -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal.out -covermode=atomic ./tests/plugins/temporal
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal_protocol.out -covermode=atomic ./plugins/temporal/protocol
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal_workflow.out -covermode=atomic ./plugins/temporal/workflow
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload
@@ -58,62 +61,68 @@ test_coverage:
test: ## Run application tests
docker-compose -f tests/docker-compose.yaml up -d
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/http/config
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/informer
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/reload
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/server
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/checker
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/config
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/gzip
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/headers
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/logger
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/metrics
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/redis
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/resetter
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/rpc
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/static
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/boltdb
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memory
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memcached
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/boltdb
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memory
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memcached
+ go test -v -race -tags=debug ./pkg/transport/pipe
+ go test -v -race -tags=debug ./pkg/transport/socket
+ go test -v -race -tags=debug ./pkg/pool
+ go test -v -race -tags=debug ./pkg/worker
+ go test -v -race -tags=debug ./pkg/worker_watcher
+ go test -v -race -tags=debug ./tests/plugins/temporal
+ go test -v -race -tags=debug ./plugins/temporal/protocol
+ go test -v -race -tags=debug ./plugins/temporal/workflow
+ go test -v -race -tags=debug ./tests/plugins/http
+ go test -v -race -tags=debug ./plugins/http/config
+ go test -v -race -tags=debug ./tests/plugins/informer
+ go test -v -race -tags=debug ./tests/plugins/reload
+ go test -v -race -tags=debug ./tests/plugins/server
+ go test -v -race -tags=debug ./tests/plugins/checker
+ go test -v -race -tags=debug ./tests/plugins/config
+ go test -v -race -tags=debug ./tests/plugins/gzip
+ go test -v -race -tags=debug ./tests/plugins/headers
+ go test -v -race -tags=debug ./tests/plugins/logger
+ go test -v -race -tags=debug ./tests/plugins/metrics
+ go test -v -race -tags=debug ./tests/plugins/redis
+ go test -v -race -tags=debug ./tests/plugins/resetter
+ go test -v -race -tags=debug ./tests/plugins/rpc
+ go test -v -race -tags=debug ./tests/plugins/static
+ go test -v -race -tags=debug ./plugins/kv/boltdb
+ go test -v -race -tags=debug ./plugins/kv/memory
+ go test -v -race -tags=debug ./plugins/kv/memcached
+ go test -v -race -tags=debug ./tests/plugins/kv/boltdb
+ go test -v -race -tags=debug ./tests/plugins/kv/memory
+ go test -v -race -tags=debug ./tests/plugins/kv/memcached
docker-compose -f tests/docker-compose.yaml down
test_1.14: ## Run application tests
docker-compose -f tests/docker-compose.yaml up -d
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/http/config
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/informer
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/reload
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/server
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/checker
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/config
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/gzip
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/headers
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/logger
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/metrics
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/redis
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/resetter
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/rpc
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/static
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/boltdb
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memory
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memcached
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/boltdb
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memory
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memcached
+ go1.14.14 test -v -race -tags=debug ./pkg/transport/pipe
+ go1.14.14 test -v -race -tags=debug ./pkg/transport/socket
+ go1.14.14 test -v -race -tags=debug ./pkg/pool
+ go1.14.14 test -v -race -tags=debug ./pkg/worker
+ go1.14.14 test -v -race -tags=debug ./pkg/worker_watcher
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/temporal
+ go1.14.14 test -v -race -tags=debug ./plugins/temporal/protocol
+ go1.14.14 test -v -race -tags=debug ./plugins/temporal/workflow
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/http
+ go1.14.14 test -v -race -tags=debug ./plugins/http/config
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/informer
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/reload
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/server
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/checker
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/config
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/gzip
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/headers
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/logger
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/metrics
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/redis
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/resetter
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/rpc
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/static
+ go1.14.14 test -v -race -tags=debug ./plugins/kv/boltdb
+ go1.14.14 test -v -race -tags=debug ./plugins/kv/memory
+ go1.14.14 test -v -race -tags=debug ./plugins/kv/memcached
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/boltdb
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memory
+ go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memcached
docker-compose -f tests/docker-compose.yaml down
test_pipeline: test_1.14 test
diff --git a/cmd/main.go b/cmd/main.go
index 1dd19107..8074f316 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -7,6 +7,9 @@ import (
"github.com/spiral/roadrunner/v2/cmd/cli"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/informer"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/activity"
+ temporalClient "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/workflow"
"github.com/spiral/roadrunner/v2/plugins/kv/boltdb"
"github.com/spiral/roadrunner/v2/plugins/kv/memcached"
@@ -52,6 +55,11 @@ func main() {
&memory.Plugin{},
// boltdb driver
&boltdb.Plugin{},
+
+ // temporal plugins
+ &temporalClient.Plugin{},
+ &activity.Plugin{},
+ &workflow.Plugin{},
)
if err != nil {
log.Fatal(err)
diff --git a/go.mod b/go.mod
index 5109f5c3..588a60a5 100644
--- a/go.mod
+++ b/go.mod
@@ -8,16 +8,19 @@ require (
github.com/alicebob/miniredis/v2 v2.14.1
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129
+ github.com/cenkalti/backoff/v4 v4.1.0
github.com/dustin/go-humanize v1.0.0
github.com/fatih/color v1.10.0
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-redis/redis/v8 v8.4.10
github.com/gofiber/fiber/v2 v2.3.3
github.com/golang/mock v1.4.4
+ github.com/golang/protobuf v1.4.3
github.com/hashicorp/go-multierror v1.1.0
github.com/json-iterator/go v1.1.10
github.com/mattn/go-runewidth v0.0.10
github.com/olekukonko/tablewriter v0.0.4
+ github.com/pborman/uuid v1.2.1
github.com/prometheus/client_golang v1.9.0
github.com/shirou/gopsutil v3.20.12+incompatible
github.com/spf13/cobra v1.1.1
@@ -30,9 +33,12 @@ require (
github.com/vbauerster/mpb/v5 v5.4.0
github.com/yookoala/gofast v0.4.0
go.etcd.io/bbolt v1.3.5
+ go.temporal.io/api v1.4.0
+ go.temporal.io/sdk v1.4.0
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
- golang.org/x/net v0.0.0-20201216054612-986b41b23924
+ golang.org/x/net v0.0.0-20201224014010-6772e930b67b
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
- golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0
+ golang.org/x/sys v0.0.0-20210105210732-16f7687f5001
+ google.golang.org/protobuf v1.25.0
)
diff --git a/go.sum b/go.sum
index 8c05b074..2288dbca 100644
--- a/go.sum
+++ b/go.sum
@@ -74,6 +74,7 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
@@ -87,6 +88,7 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -102,8 +104,12 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw=
+github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
@@ -123,20 +129,30 @@ github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgO
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
+github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
+github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY=
github.com/go-redis/redis/v8 v8.4.10 h1:fWdl0RBmVibUDOp8bqz1e2Yy9dShOeIeWsiAifYk06Y=
github.com/go-redis/redis/v8 v8.4.10/go.mod h1:d5yY/TlkQyYBSBHnXUmnf1OrHbyQere5JV4dLKwvXmo=
github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gofiber/fiber/v2 v2.3.0/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0=
github.com/gofiber/fiber/v2 v2.3.3 h1:nsjc9TfCl+ojXgEAu+uAT1Le7iQtZJ+Gfb/ox6+BM4w=
github.com/gofiber/fiber/v2 v2.3.3/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0=
+github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
+github.com/gogo/googleapis v1.4.0 h1:zgVt4UpGxcqVOw97aRGxT4svlcmdK35fynLNctY32zI=
+github.com/gogo/googleapis v1.4.0/go.mod h1:5YRNX2z1oM5gXdAkurHa942MDgEJyk02w4OecKY87+c=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
+github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
+github.com/gogo/status v1.1.0 h1:+eIkrewn5q6b30y+g/BJINVVdi2xH7je5MPJ3ZPK3JA=
+github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -156,6 +172,7 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
@@ -167,6 +184,7 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -175,6 +193,9 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.1.4 h1:0ecGp3skIrHWPNGPJDaBIghfA6Sp7Ruo2Io8eLKzWm0=
+github.com/google/uuid v1.1.4/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
@@ -238,6 +259,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
+github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.7 h1:7rix8v8GpI3ZBb0nSozFRgbtXKv+hOe+qfEpZqybrAg=
github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
@@ -250,6 +272,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
@@ -297,6 +320,7 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
@@ -320,6 +344,8 @@ github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
+github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
+github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxSfWAKL3wpBW7V8scJMt8N8gnaMCS9E/cA=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
@@ -327,6 +353,8 @@ github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnh
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
+github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw=
+github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
@@ -383,6 +411,8 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -424,8 +454,12 @@ github.com/spf13/viper v1.7.0 h1:xVKxvI7ouOI5I+U9s2eeiUfMaWBVoXA3AWskkrqK0VM=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
+github.com/spiral/endure v1.0.0-beta20/go.mod h1:qCU2/4gAItVESzUK0yPExmUTlTcpRLqJUgcV+nqxn+o=
github.com/spiral/endure v1.0.0-beta21 h1:YW3gD6iNhRByG/yFkm/Ko+nj+oTBsjBtPVHFA2nt67k=
github.com/spiral/endure v1.0.0-beta21/go.mod h1:GsItn+dYSO4O5uwvfki23xyxRnmBhxEyL6jBeJQoFPw=
+github.com/spiral/endure v1.0.0-beta9 h1:PNiVit9DCucmhZLd4RgoiVL7Y5DBmweUwadFyulAct8=
+github.com/spiral/endure v1.0.0-beta9/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ=
+github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM=
github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.7/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
@@ -438,9 +472,12 @@ github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As=
+github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
@@ -449,6 +486,10 @@ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/uber-go/tally v3.3.17+incompatible h1:nFHIuW3VQ22wItiE9kPXic8dEgExWOsVOHwpmoIvsMw=
+github.com/uber-go/tally v3.3.17+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
+github.com/uber/jaeger-client-go v2.23.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
+github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@@ -464,6 +505,8 @@ github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6Ac
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yookoala/gofast v0.4.0 h1:dLBjghcsbbZNOEHN8N1X/gh9S6srmJed4WQfG7DlKwo=
github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=
@@ -477,14 +520,22 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
+go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA=
+go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v0.16.0 h1:uIWEbdeb4vpKPGITLsRVUS44L5oDbDUCZxn8lkxhmgw=
go.opentelemetry.io/otel v0.16.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA=
+go.temporal.io/api v1.4.0 h1:Ga1Ih8YE5ULs+UGt7u6Ppcf5SUMLyh4BATAs6SyPO0w=
+go.temporal.io/api v1.4.0/go.mod h1:H0yXehwGE9Sn9zVruyy9aumq17SMsK1WmIy4GX3MIKw=
+go.temporal.io/sdk v1.4.0 h1:IDIgfhakfgMv+zOMCQkXyqR7zHH+T4Nt2VAH5qW4w3w=
+go.temporal.io/sdk v1.4.0/go.mod h1:hZ3Jd/Aoom1ao+fRyFf6y3MfHwXdPlhJiykX/gGmBeA=
+go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
@@ -520,11 +571,15 @@ golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
+golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -544,12 +599,16 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201216054612-986b41b23924 h1:QsnDpLLOKwHBBDa8nDws4DYNc/ryVW2vCpxCs09d4PY=
golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw=
+golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -560,6 +619,7 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -604,6 +664,8 @@ golang.org/x/sys v0.0.0-20201218084310-7d0127a74742 h1:+CBz4km/0KPU3RGTwARGh/noP
golang.org/x/sys v0.0.0-20201218084310-7d0127a74742/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0 h1:n+DPcgTwkgWzIFpLmoimYR2K2b0Ga5+Os4kayIN0vGo=
golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 h1:/dSxr6gT0FNI1MO5WLJo8mTmItROeOKTkDn+7OwWBos=
+golang.org/x/sys v0.0.0-20210105210732-16f7687f5001/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -611,14 +673,19 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc=
+golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 h1:Hir2P/De0WpUhtrKGGjvSb2YxUgyZ7EFOSLIcSSpiwE=
+golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180726210403-bfb5194568d3/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
@@ -636,14 +703,21 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 h1:DnSr2mCsxyCE6ZgIkmcWUQY2R5cH/6wL7eIxEmQOMSE=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
+golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
+golang.org/x/tools v0.0.0-20200605181038-cef9fc3bc8f0 h1:gxU2P+MOOGAWge5BKP+BzqSeegxvDBRib5rk3yZDDuI=
+golang.org/x/tools v0.0.0-20200605181038-cef9fc3bc8f0/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210115202250-e0d201561e39/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
@@ -656,6 +730,7 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I=
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
+google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
@@ -667,6 +742,11 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a h1:Ob5/580gVHBJZgXnff1cZDbG+xLtMVE5mDRTe+nIsX4=
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
+google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
+google.golang.org/genproto v0.0.0-20210106152847-07624b53cd92 h1:jOTk2Z6KYaWoptUFqZ167cS8peoUPjFEXrsqfVkkCGc=
+google.golang.org/genproto v0.0.0-20210106152847-07624b53cd92/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
+google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
@@ -677,20 +757,30 @@ google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij
google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.34.0 h1:raiipEjMOIC/TO2AvyTxP25XFdLxNIBwzDh3FM3XztI=
+google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
+google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
+google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
@@ -714,6 +804,8 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0-20210106172901-c476de37821d h1:827r06Ng1EGlK/5Qb/mj+yHDj6pgKf5CjoX4v24FRJ0=
+gopkg.in/yaml.v3 v3.0.0-20210106172901-c476de37821d/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 4d64ac6d..3672f5ac 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -34,7 +34,7 @@ const (
PluginName = "http"
// RR_HTTP env variable key (internal) if the HTTP presents
- RR_HTTP = "RR_HTTP" //nolint:golint,stylecheck
+ RR_MODE = "RR_MODE" //nolint:golint,stylecheck
// HTTPS_SCHEME
HTTPS_SCHEME = "https" //nolint:golint,stylecheck
@@ -101,7 +101,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
s.cfg.Env = make(map[string]string)
}
- s.cfg.Env[RR_HTTP] = "true"
+ s.cfg.Env[RR_MODE] = "http"
s.pool, err = server.NewWorkerPool(context.Background(), pool.Config{
Debug: s.cfg.Pool.Debug,
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go
new file mode 100644
index 00000000..d09722ce
--- /dev/null
+++ b/plugins/temporal/activity/activity_pool.go
@@ -0,0 +1,197 @@
+package activity
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ rrWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/activity"
+ "go.temporal.io/sdk/converter"
+ "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/worker"
+)
+
+// RR_MODE env variable
+const RR_MODE = "RR_MODE" //nolint:golint,stylecheck
+// RR_CODEC env variable
+const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck
+
+//
+const doNotCompleteOnReturn = "doNotCompleteOnReturn"
+
+type activityPool interface {
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.SyncWorker
+ ActivityNames() []string
+ GetActivityContext(taskToken []byte) (context.Context, error)
+}
+
+type activityPoolImpl struct {
+ dc converter.DataConverter
+ codec rrt.Codec
+ seqID uint64
+ activities []string
+ wp pool.Pool
+ tWorkers []worker.Worker
+ running sync.Map
+}
+
+// newActivityPool
+func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) {
+ const op = errors.Op("new_activity_pool")
+ // env variables
+ env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}
+ wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &activityPoolImpl{
+ codec: codec,
+ wp: wp,
+ running: sync.Map{},
+ }, nil
+}
+
+// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
+func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("activity_pool_start")
+ pool.dc = temporal.GetDataConverter()
+
+ err := pool.initWorkers(ctx, temporal)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ for i := 0; i < len(pool.tWorkers); i++ {
+ err := pool.tWorkers[i].Start()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
+func (pool *activityPoolImpl) Destroy(ctx context.Context) error {
+ for i := 0; i < len(pool.tWorkers); i++ {
+ pool.tWorkers[i].Stop()
+ }
+
+ pool.wp.Destroy(ctx)
+ return nil
+}
+
+// Workers returns list of all allocated workers.
+func (pool *activityPoolImpl) Workers() []rrWorker.SyncWorker {
+ return pool.wp.Workers()
+}
+
+// ActivityNames returns list of all available activity names.
+func (pool *activityPoolImpl) ActivityNames() []string {
+ return pool.activities
+}
+
+// ActivityNames returns list of all available activity names.
+func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) {
+ const op = errors.Op("activity_pool_get_activity_context")
+ c, ok := pool.running.Load(string(taskToken))
+ if !ok {
+ return nil, errors.E(op, errors.Str("heartbeat on non running activity"))
+ }
+
+ return c.(context.Context), nil
+}
+
+// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
+func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("activity_pool_create_temporal_worker")
+
+ workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ pool.activities = make([]string, 0)
+ pool.tWorkers = make([]worker.Worker, 0)
+
+ for i := 0; i < len(workerInfo); i++ {
+ w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options)
+ if err != nil {
+ return errors.E(op, err, pool.Destroy(ctx))
+ }
+
+ pool.tWorkers = append(pool.tWorkers, w)
+ for j := 0; j < len(workerInfo[i].Activities); j++ {
+ w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{
+ Name: workerInfo[i].Activities[j].Name,
+ DisableAlreadyRegisteredCheck: false,
+ })
+
+ pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name)
+ }
+ }
+
+ return nil
+}
+
+// executes activity with underlying worker.
+func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) {
+ const op = errors.Op("activity_pool_execute_activity")
+
+ heartbeatDetails := &common.Payloads{}
+ if activity.HasHeartbeatDetails(ctx) {
+ err := activity.GetHeartbeatDetails(ctx, &heartbeatDetails)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ }
+
+ var info = activity.GetInfo(ctx)
+ var msg = rrt.Message{
+ ID: atomic.AddUint64(&pool.seqID, 1),
+ Command: rrt.InvokeActivity{
+ Name: info.ActivityType.Name,
+ Info: info,
+ HeartbeatDetails: len(heartbeatDetails.Payloads),
+ },
+ Payloads: args,
+ }
+
+ if len(heartbeatDetails.Payloads) != 0 {
+ msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...)
+ }
+
+ pool.running.Store(string(info.TaskToken), ctx)
+ defer pool.running.Delete(string(info.TaskToken))
+
+ result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if len(result) != 1 {
+ return nil, errors.E(op, errors.Str("invalid activity worker response"))
+ }
+
+ out := result[0]
+ if out.Failure != nil {
+ if out.Failure.Message == doNotCompleteOnReturn {
+ return nil, activity.ErrResultPending
+ }
+
+ return nil, internalbindings.ConvertFailureToError(out.Failure, pool.dc)
+ }
+
+ return out.Payloads, nil
+}
diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go
new file mode 100644
index 00000000..5e562a8d
--- /dev/null
+++ b/plugins/temporal/activity/plugin.go
@@ -0,0 +1,215 @@
+package activity
+
+import (
+ "context"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+)
+
+const (
+ // PluginName defines public service name.
+ PluginName = "activities"
+
+ // RRMode sets as RR_MODE env variable to let worker know about the mode to run.
+ RRMode = "temporal/activity"
+)
+
+// Plugin to manage activity execution.
+type Plugin struct {
+ temporal client.Temporal
+ events events.Handler
+ server server.Server
+ log logger.Logger
+ mu sync.Mutex
+ reset chan struct{}
+ pool activityPool
+ closing int64
+}
+
+// Init configures activity service.
+func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
+ const op = errors.Op("activity_plugin_init")
+ if temporal.GetConfig().Activities == nil {
+ // no need to serve activities
+ return errors.E(op, errors.Disabled)
+ }
+
+ p.temporal = temporal
+ p.server = server
+ p.events = events.NewEventsHandler()
+ p.log = log
+ p.reset = make(chan struct{})
+
+ return nil
+}
+
+// Serve activities with underlying workers.
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("activity_plugin_serve")
+
+ errCh := make(chan error, 1)
+ pool, err := p.startPool()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ p.pool = pool
+
+ go func() {
+ for {
+ select {
+ case <-p.reset:
+ if atomic.LoadInt64(&p.closing) == 1 {
+ return
+ }
+
+ err := p.replacePool()
+ if err == nil {
+ continue
+ }
+
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.InitialInterval = time.Second
+
+ err = backoff.Retry(p.replacePool, bkoff)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ }
+ }
+ }
+ }()
+
+ return errCh
+}
+
+// Stop stops the serving plugin.
+func (p *Plugin) Stop() error {
+ atomic.StoreInt64(&p.closing, 1)
+ const op = errors.Op("activity_plugin_stop")
+
+ pool := p.getPool()
+ if pool != nil {
+ p.pool = nil
+ err := pool.Destroy(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+
+ return nil
+}
+
+// Name of the service.
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// RPC returns associated rpc service.
+func (p *Plugin) RPC() interface{} {
+ return &rpc{srv: p, client: p.temporal.GetClient()}
+}
+
+// Workers returns pool workers.
+func (p *Plugin) Workers() []worker.SyncWorker {
+ return p.getPool().Workers()
+}
+
+// ActivityNames returns list of all available activities.
+func (p *Plugin) ActivityNames() []string {
+ return p.pool.ActivityNames()
+}
+
+// Reset resets underlying workflow pool with new copy.
+func (p *Plugin) Reset() error {
+ p.reset <- struct{}{}
+
+ return nil
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) AddListener(listener events.Listener) {
+ p.events.AddListener(listener)
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) poolListener(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventPoolError {
+ p.log.Error("Activity pool error", "error", ev.Payload.(error))
+ p.reset <- struct{}{}
+ }
+ }
+
+ p.events.Push(event)
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) startPool() (activityPool, error) {
+ pool, err := newActivityPool(
+ p.temporal.GetCodec().WithLogger(p.log),
+ p.poolListener,
+ *p.temporal.GetConfig().Activities,
+ p.server,
+ )
+
+ if err != nil {
+ return nil, errors.E(errors.Op("newActivityPool"), err)
+ }
+
+ err = pool.Start(context.Background(), p.temporal)
+ if err != nil {
+ return nil, errors.E(errors.Op("startActivityPool"), err)
+ }
+
+ p.log.Debug("Started activity processing", "activities", pool.ActivityNames())
+
+ return pool, nil
+}
+
+func (p *Plugin) replacePool() error {
+ pool, err := p.startPool()
+ if err != nil {
+ p.log.Error("Replace activity pool failed", "error", err)
+ return errors.E(errors.Op("newActivityPool"), err)
+ }
+
+ p.log.Debug("Replace activity pool")
+
+ var previous activityPool
+
+ p.mu.Lock()
+ previous, p.pool = p.pool, pool
+ p.mu.Unlock()
+
+ errD := previous.Destroy(context.Background())
+ if errD != nil {
+ p.log.Error(
+ "Unable to destroy expired activity pool",
+ "error",
+ errors.E(errors.Op("destroyActivityPool"), errD),
+ )
+ }
+
+ return nil
+}
+
+// getPool returns currently pool.
+func (p *Plugin) getPool() activityPool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.pool
+}
diff --git a/plugins/temporal/activity/rpc.go b/plugins/temporal/activity/rpc.go
new file mode 100644
index 00000000..49efcd4f
--- /dev/null
+++ b/plugins/temporal/activity/rpc.go
@@ -0,0 +1,66 @@
+package activity
+
+import (
+ v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/activity"
+ "go.temporal.io/sdk/client"
+ "google.golang.org/protobuf/proto"
+)
+
+/*
+- the method's type is exported.
+- the method is exported.
+- the method has two arguments, both exported (or builtin) types.
+- the method's second argument is a pointer.
+- the method has return type error.
+*/
+type rpc struct {
+ srv *Plugin
+ client client.Client
+}
+
+// RecordHeartbeatRequest sent by activity to record current state.
+type RecordHeartbeatRequest struct {
+ TaskToken []byte `json:"taskToken"`
+ Details []byte `json:"details"`
+}
+
+// RecordHeartbeatResponse sent back to the worker to indicate that activity was cancelled.
+type RecordHeartbeatResponse struct {
+ Canceled bool `json:"canceled"`
+}
+
+// RecordActivityHeartbeat records heartbeat for an activity.
+// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity.
+// details - is the progress you want to record along with heart beat for this activity.
+// The errors it can return:
+// - EntityNotExistsError
+// - InternalServiceError
+// - CanceledError
+func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHeartbeatResponse) error {
+ details := &commonpb.Payloads{}
+
+ if len(in.Details) != 0 {
+ if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil {
+ return err
+ }
+ }
+
+ // find running activity
+ ctx, err := r.srv.getPool().GetActivityContext(in.TaskToken)
+ if err != nil {
+ return err
+ }
+
+ activity.RecordHeartbeat(ctx, details)
+
+ select {
+ case <-ctx.Done():
+ *out = RecordHeartbeatResponse{Canceled: true}
+ default:
+ *out = RecordHeartbeatResponse{Canceled: false}
+ }
+
+ return nil
+}
diff --git a/plugins/temporal/client/doc/doc.go b/plugins/temporal/client/doc/doc.go
new file mode 100644
index 00000000..10257070
--- /dev/null
+++ b/plugins/temporal/client/doc/doc.go
@@ -0,0 +1 @@
+package doc
diff --git a/plugins/temporal/client/doc/temporal.drawio b/plugins/temporal/client/doc/temporal.drawio
new file mode 100644
index 00000000..f2350af8
--- /dev/null
+++ b/plugins/temporal/client/doc/temporal.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2020-10-20T11:17:09.390Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="U-Sm3BmbD4KaqDkjKEOx" version="13.7.9" type="device"><diagram name="Page-1" id="6133507b-19e7-1e82-6fc7-422aa6c4b21f">7V1tc6M2EP41nsl1Jh5eDIaPsZPcdXptc83N5dpvspFtNRhRkGP7fn0lkDAgSEgCxrkoH2K0SEJon919VrwNzOl69zEC4ep37EF/YGjebmBeDgxD17UR/WGSPZdohpVKlhHyuOwguEU/oKjIpRvkwbhQkWDsExQWhXMcBHBOCjIQRXhbrLbAfvGoIVhCSXA7B74svUMeWaVSwzTtw45PEC1X4tC2xU95Bub3ywhvAn7AgWEukr909xqIzviZxivg4W1OZF4NzGmEMUm31rsp9NnsinlL213X7M0GHsGANGkwc0ezuW46c+A6GlycO3xYZC/mAnp0angRR2SFlzgA/tVBOklOF7IONVpakbVPN3W66YMZ9CfZjEyxjyO6K8ABaxYTEJELpq2S7Br5Pu8sKXN8OLQMA6/Ugkpy9WmJ19ZtWlzggFyDNfIZEL/ByAMB4GJRjc74RJ40Po8x3kRzPg9jx7V13XIXmg1dBzrnI5MjE0RLSGoq6eO0EpvFXNdcGR8hXkMS7WmFCPqAoIciAAHH8TKrx5vSSQD7XIUQo4DEuZ5vmIBWyGxyzAHHTdLR3CIuntmAbqRjEKXcyRxECdiaAU/nHuIB+Bs+CdpwqEtwJHBHiiiLYIx+gFlSgWGAzwWtbU0G1iWVAB8tAyqYU/1CisDJA4wIouZ+wXeskeclUK5B7MGKZfBUYkwCVI2dsYHAXUHn3KfxEyq4hQJgeKtzbWiNR1ZBV+cO91PPA5UEAqcIgXGpA7xYxBT4RVS8Gge2hAMFgqdAoA0dV3gagQG9FQhkdpl16x4BBO6biUIeAmsceF9XKOg1GDlGg2AkIlZ7waig8+dYuStb+XD4y89q6OJs2/D2zsjuxNKFSvaV7bvx9Zqk8FO1cxzCN2DgrnUqBi7ymyKd+2kNXCC5FQunarXaMWm95Ci0Ixi1GOZ7IHDtqV0b2q5b6YFfiQGjBAHjGH5dQsDAsH2m8LAAA/u/DVvnmKypT2P86YLu1cId/Z+4SS2VnxMcpvtGuX0MQeccEmwfR0XWJ91a8t/kyCgnAOuQoUQu/cr6WADqhIW42KTQI50VVD7KLJIkQvAVrkMcAT/XfFauTGVhWfaiKfPhguTmrG5S5OFWjWAVsSGIRTrRl17fbTdDZnU/QjL1EVX12QdaOpsn28NUNDCmLPTRgB19qDpB0R4HC7RM2qebNXW5yoNZzH6mEQQE3uHoHkZnBMT3XzaQgpudZ4SCZXpsHBKEg5hubZOKwz9TQTJWLkq7qBprcepLDrPs6xKbyFxiOncTTGst/ISvLBKW8ribm1K+gdhYtD/gtuSNCyQrW6RkhTleoznffopyRfg+W0DV+eFz3pj9XV8/Rokkx1rrQEduabGCF7eH9VtLrGes8ku35aD4EvZjmJZl2WPHAI4JbU+r8IBTH8RxANZQUm28RWsfJDOW00CiOV5JbxL9UkTMV8j3PoM93rDzo4R2fi9KkxWO0A/aLRDHyPNdwy7UuGUtuZaT0AxvhHayaA0/g5hkqPB9EMZolg04NesJJgSvBY74mTIKXRWUu0Jd7nC8UiOq/npQ2qMiKHVNXCfIwVK3q2BpOx3A0qgIzMwtLhBkV0iY8yX7UIaoTNfSWS7PqjzRJQ9VDdo4BHPqRD8ndS5HB8lffEKYaLtCBN6GIMmQthEIiw5vRclfkqJFmABSSyQndK6n2tBilJL6YIuFMVFOWCaN0SxE0PMDKNE7pCDfwvjglfOIqDX8pyHCIWHYzQAh6rWKB1Ph4dTwYBk94kG+BqPw0DMexk6PeJCvxUiq91GZuTQKwE/oPUvluaK/MhxcnusSGEwZDGaFkhPKcINjxPg4lUVp3YpVhmPrN7tZ4ikFd8EHxjX2TnteYe+M2foHZfa9wEI/YhyovWtB5S8qfykk1ZacVVenL3oL7kqCpaH4asuOqt7yTzKBkRExUog4OUQcM4WRhyvfWaA4a9sabpqUdBIEFGk9Wct3+owFI5kdKNL6DkmrbozLrLXxovuoC1wqjtK1p8pM/22w1pFaZz09RPTKWq2qW2YUa21Xw32yVoEuxVpPz/KPyVqluyUVLE4VFkddgi/jom6lU+Gif1yYR7wkK43Xlte3VJb7DrNcxy4muc1vLXONDmApUm6V0nTmqTLLfxtJ7rhqnValNO1quGnS2kVKI55IU5HofUciab2VhiKzGSz1URexyFELrl3Hosz230YscqouCitE9IuIXhdc3Sq+qthJuxruc8HVrQsCagWld8vvc8FVweJkYdHvPc+ufFFWpTPvMZ2x9JenMy28SUUGZtVTOIq8tumrMtt/G+mMrslLLwoSfUOi39uetaorhiqhaVfFfSY02YOCirqenu33mdHU3UyoYNE7LHp+ilNzJGiolOY9pjSa+9KbBfTy+35bQqZakO/86R2t7vXSJ5rU6OoWkhPERL9ZTcXb4FVW07qOe01rKt41ofjrqVj/MfOai5vb8fe7y4urL1eju+Cf6NO33+yKFylezAl6QGR/C6MH9vLKehr7FrhrKvod7AoVn8toH32tZxsE0ijxR4GBHC6yu4AK9NFswWdUAqPBCujhjbsaOrzYt9p9HN9nHNk/PAqRoqVIeHnUMl/DLisx05kvafDUZfp23mswJzgZusJN/7ipYKDHxU2D628iKN1grNzNacCmgtQeFzbNL8j8vPnKM4lJO5qveqVipeq74ibtXKh53z6iM3BULMkf1y+8/hObhzqfcWLmzMr/hYQmRWkmAjYEF3NiuEPke277b9YV1VNauhTpR1LYi0JAT/d7vpC2MixRPrRLSqLhy9KR/OePHqP8+S8g1Xx5pPPvH9Hi4Rut6RdTDp/CNa/+Bw==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go
new file mode 100644
index 00000000..047a1815
--- /dev/null
+++ b/plugins/temporal/client/plugin.go
@@ -0,0 +1,169 @@
+package client
+
+import (
+ "fmt"
+ "os"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ "go.temporal.io/sdk/client"
+ "go.temporal.io/sdk/converter"
+ "go.temporal.io/sdk/worker"
+)
+
+// PluginName defines public service name.
+const PluginName = "temporal"
+
+// indicates that the case size was set
+var stickyCacheSet = false
+
+// Plugin implement Temporal contract.
+type Plugin struct {
+ workerID int32
+ cfg *Config
+ dc converter.DataConverter
+ log logger.Logger
+ client client.Client
+}
+
+// Temporal define common interface for RoadRunner plugins.
+type Temporal interface {
+ GetClient() client.Client
+ GetDataConverter() converter.DataConverter
+ GetConfig() Config
+ GetCodec() rrt.Codec
+ CreateWorker(taskQueue string, options worker.Options) (worker.Worker, error)
+}
+
+// Config of the temporal client and depended services.
+type Config struct {
+ Address string
+ Namespace string
+ Activities *pool.Config
+ Codec string
+ DebugLevel int `mapstructure:"debug_level"`
+ CacheSize int `mapstructure:"cache_size"`
+}
+
+// Init initiates temporal client plugin.
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("temporal_client_plugin_init")
+ p.log = log
+ p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter())
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ if p.cfg == nil {
+ return errors.E(op, errors.Disabled)
+ }
+
+ return nil
+}
+
+// GetConfig returns temporal configuration.
+func (p *Plugin) GetConfig() Config {
+ if p.cfg != nil {
+ return *p.cfg
+ }
+ // empty
+ return Config{}
+}
+
+// GetCodec returns communication codec.
+func (p *Plugin) GetCodec() rrt.Codec {
+ if p.cfg.Codec == "json" {
+ return rrt.NewJSONCodec(rrt.DebugLevel(p.cfg.DebugLevel), p.log)
+ }
+
+ // production ready protocol, no debug abilities
+ return rrt.NewProtoCodec()
+}
+
+// GetDataConverter returns data active data converter.
+func (p *Plugin) GetDataConverter() converter.DataConverter {
+ return p.dc
+}
+
+// Serve starts temporal srv.
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("temporal_client_plugin_serve")
+ errCh := make(chan error, 1)
+ var err error
+
+ if stickyCacheSet == false && p.cfg.CacheSize != 0 {
+ worker.SetStickyWorkflowCacheSize(p.cfg.CacheSize)
+ stickyCacheSet = true
+ }
+
+ p.client, err = client.NewClient(client.Options{
+ Logger: p.log,
+ HostPort: p.cfg.Address,
+ Namespace: p.cfg.Namespace,
+ DataConverter: p.dc,
+ })
+
+ if err != nil {
+ errCh <- errors.E(op, err)
+ }
+
+ p.log.Debug("connected to temporal server", "address", p.cfg.Address)
+
+ return errCh
+}
+
+// Stop stops temporal srv connection.
+func (p *Plugin) Stop() error {
+ if p.client != nil {
+ p.client.Close()
+ }
+
+ return nil
+}
+
+// GetClient returns active srv connection.
+func (p *Plugin) GetClient() client.Client {
+ return p.client
+}
+
+// CreateWorker allocates new temporal worker on an active connection.
+func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) {
+ const op = errors.Op("temporal_client_plugin_create_worker")
+ if p.client == nil {
+ return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client"))
+ }
+
+ if options.Identity == "" {
+ if tq == "" {
+ tq = client.DefaultNamespace
+ }
+
+ // ensures unique worker IDs
+ options.Identity = fmt.Sprintf(
+ "%d@%s@%s@%v",
+ os.Getpid(),
+ getHostName(),
+ tq,
+ atomic.AddInt32(&p.workerID, 1),
+ )
+ }
+
+ return worker.New(p.client, tq, options), nil
+}
+
+// Name of the service.
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func getHostName() string {
+ hostName, err := os.Hostname()
+ if err != nil {
+ hostName = "Unknown"
+ }
+ return hostName
+}
diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go
new file mode 100644
index 00000000..406e70f4
--- /dev/null
+++ b/plugins/temporal/protocol/converter.go
@@ -0,0 +1,76 @@
+package protocol
+
+import (
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/converter"
+)
+
+type (
+ // DataConverter wraps Temporal data converter to enable direct access to the payloads.
+ DataConverter struct {
+ fallback converter.DataConverter
+ }
+)
+
+// NewDataConverter creates new data converter.
+func NewDataConverter(fallback converter.DataConverter) converter.DataConverter {
+ return &DataConverter{fallback: fallback}
+}
+
+// ToPayloads converts a list of values.
+func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) {
+ for _, v := range values {
+ if aggregated, ok := v.(*commonpb.Payloads); ok {
+ // bypassing
+ return aggregated, nil
+ }
+ }
+
+ return r.fallback.ToPayloads(values...)
+}
+
+// ToPayload converts single value to payload.
+func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
+ return r.fallback.ToPayload(value)
+}
+
+// FromPayloads converts to a list of values of different types.
+// Useful for deserializing arguments of function invocations.
+func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
+ if payloads == nil {
+ return nil
+ }
+
+ if len(valuePtrs) == 1 {
+ // input proxying
+ if input, ok := valuePtrs[0].(**commonpb.Payloads); ok {
+ *input = &commonpb.Payloads{}
+ (*input).Payloads = payloads.Payloads
+ return nil
+ }
+ }
+
+ for i := 0; i < len(payloads.Payloads); i++ {
+ err := r.FromPayload(payloads.Payloads[i], valuePtrs[i])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// FromPayload converts single value from payload.
+func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
+ return r.fallback.FromPayload(payload, valuePtr)
+}
+
+// ToString converts payload object into human readable string.
+func (r *DataConverter) ToString(input *commonpb.Payload) string {
+ return r.fallback.ToString(input)
+}
+
+// ToStrings converts payloads object into human readable strings.
+func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string {
+ return r.fallback.ToStrings(input)
+}
diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go
new file mode 100644
index 00000000..6ce9fa0f
--- /dev/null
+++ b/plugins/temporal/protocol/converter_test.go
@@ -0,0 +1,23 @@
+package protocol
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/converter"
+)
+
+func Test_Passthough(t *testing.T) {
+ codec := NewDataConverter(converter.GetDefaultDataConverter())
+
+ value, err := codec.ToPayloads("test")
+ assert.NoError(t, err)
+
+ out := &common.Payloads{}
+
+ assert.Len(t, out.Payloads, 0)
+ assert.NoError(t, codec.FromPayloads(value, &out))
+
+ assert.Len(t, out.Payloads, 1)
+}
diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go
new file mode 100644
index 00000000..c554e28f
--- /dev/null
+++ b/plugins/temporal/protocol/internal/protocol.pb.go
@@ -0,0 +1,167 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: protocol.proto
+
+package internal
+
+import (
+ fmt "fmt"
+ math "math"
+
+ proto "github.com/golang/protobuf/proto"
+ v11 "go.temporal.io/api/common/v1"
+ v1 "go.temporal.io/api/failure/v1"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+type Frame struct {
+ Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Frame) Reset() { *m = Frame{} }
+func (m *Frame) String() string { return proto.CompactTextString(m) }
+func (*Frame) ProtoMessage() {}
+func (*Frame) Descriptor() ([]byte, []int) {
+ return fileDescriptor_2bc2336598a3f7e0, []int{0}
+}
+
+func (m *Frame) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Frame.Unmarshal(m, b)
+}
+func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Frame.Marshal(b, m, deterministic)
+}
+func (m *Frame) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Frame.Merge(m, src)
+}
+func (m *Frame) XXX_Size() int {
+ return xxx_messageInfo_Frame.Size(m)
+}
+func (m *Frame) XXX_DiscardUnknown() {
+ xxx_messageInfo_Frame.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Frame proto.InternalMessageInfo
+
+func (m *Frame) GetMessages() []*Message {
+ if m != nil {
+ return m.Messages
+ }
+ return nil
+}
+
+// Single communication message.
+type Message struct {
+ Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
+ // command name (if any)
+ Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"`
+ // command options in json format.
+ Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"`
+ // error response.
+ Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"`
+ // invocation or result payloads.
+ Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Message) Reset() { *m = Message{} }
+func (m *Message) String() string { return proto.CompactTextString(m) }
+func (*Message) ProtoMessage() {}
+func (*Message) Descriptor() ([]byte, []int) {
+ return fileDescriptor_2bc2336598a3f7e0, []int{1}
+}
+
+func (m *Message) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Message.Unmarshal(m, b)
+}
+func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Message.Marshal(b, m, deterministic)
+}
+func (m *Message) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Message.Merge(m, src)
+}
+func (m *Message) XXX_Size() int {
+ return xxx_messageInfo_Message.Size(m)
+}
+func (m *Message) XXX_DiscardUnknown() {
+ xxx_messageInfo_Message.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Message proto.InternalMessageInfo
+
+func (m *Message) GetId() uint64 {
+ if m != nil {
+ return m.Id
+ }
+ return 0
+}
+
+func (m *Message) GetCommand() string {
+ if m != nil {
+ return m.Command
+ }
+ return ""
+}
+
+func (m *Message) GetOptions() []byte {
+ if m != nil {
+ return m.Options
+ }
+ return nil
+}
+
+func (m *Message) GetFailure() *v1.Failure {
+ if m != nil {
+ return m.Failure
+ }
+ return nil
+}
+
+func (m *Message) GetPayloads() *v11.Payloads {
+ if m != nil {
+ return m.Payloads
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame")
+ proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message")
+}
+
+func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) }
+
+var fileDescriptor_2bc2336598a3f7e0 = []byte{
+ // 257 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31,
+ 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec,
+ 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92,
+ 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef,
+ 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c,
+ 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56,
+ 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3,
+ 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47,
+ 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b,
+ 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2,
+ 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf,
+ 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a,
+ 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4,
+ 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43,
+ 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef,
+ 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00,
+ 0x00,
+}
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go
new file mode 100644
index 00000000..e7a77068
--- /dev/null
+++ b/plugins/temporal/protocol/json_codec.go
@@ -0,0 +1,225 @@
+package protocol
+
+import (
+ "github.com/fatih/color"
+ j "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+var json = j.ConfigCompatibleWithStandardLibrary
+
+// JSONCodec can be used for debugging and log capturing reasons.
+type JSONCodec struct {
+ // level enables verbose logging or all incoming and outcoming messages.
+ level DebugLevel
+
+ // logger renders messages when debug enabled.
+ logger logger.Logger
+}
+
+// jsonFrame contains message command in binary form.
+type jsonFrame struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
+
+ // Command name. Optional.
+ Command string `json:"command,omitempty"`
+
+ // Options to be unmarshalled to body (raw payload).
+ Options j.RawMessage `json:"options,omitempty"`
+
+ // Failure associated with command id.
+ Failure []byte `json:"failure,omitempty"`
+
+ // Payloads specific to the command or result.
+ Payloads []byte `json:"payloads,omitempty"`
+}
+
+// NewJSONCodec creates new Json communication codec.
+func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec {
+ return &JSONCodec{
+ level: level,
+ logger: logger,
+ }
+}
+
+// WithLogger creates new codes instance with attached logger.
+func (c *JSONCodec) WithLogger(logger logger.Logger) Codec {
+ return &JSONCodec{
+ level: c.level,
+ logger: logger,
+ }
+}
+
+// GetName returns codec name.
+func (c *JSONCodec) GetName() string {
+ return "json"
+}
+
+// Execute exchanges commands with worker.
+func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
+ const op = errors.Op("json_codec_execute")
+ if len(msg) == 0 {
+ return nil, nil
+ }
+
+ var response = make([]jsonFrame, 0, 5)
+ var result = make([]Message, 0, 5)
+ var err error
+
+ frames := make([]jsonFrame, 0, len(msg))
+ for _, m := range msg {
+ frame, err := c.packFrame(m)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ frames = append(frames, frame)
+ }
+
+ p := payload.Payload{}
+
+ if ctx.IsEmpty() {
+ p.Context = []byte("null")
+ }
+
+ p.Context, err = json.Marshal(ctx)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ p.Body, err = json.Marshal(frames)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if c.level >= DebugNormal {
+ logMessage := string(p.Body) + " " + string(p.Context)
+ if c.level >= DebugHumanized {
+ logMessage = color.GreenString(logMessage)
+ }
+
+ c.logger.Debug(logMessage)
+ }
+
+ out, err := e.Exec(p)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if len(out.Body) == 0 {
+ // worker inactive or closed
+ return nil, nil
+ }
+
+ if c.level >= DebugNormal {
+ logMessage := string(out.Body)
+ if c.level >= DebugHumanized {
+ logMessage = color.HiYellowString(logMessage)
+ }
+
+ c.logger.Debug(logMessage, "receive", true)
+ }
+
+ err = json.Unmarshal(out.Body, &response)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ for _, f := range response {
+ msg, err := c.parseFrame(f)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ result = append(result, msg)
+ }
+
+ return result, nil
+}
+
+func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) {
+ var (
+ err error
+ frame jsonFrame
+ )
+
+ frame.ID = msg.ID
+
+ if msg.Payloads != nil {
+ frame.Payloads, err = msg.Payloads.Marshal()
+ if err != nil {
+ return jsonFrame{}, err
+ }
+ }
+
+ if msg.Failure != nil {
+ frame.Failure, err = msg.Failure.Marshal()
+ if err != nil {
+ return jsonFrame{}, err
+ }
+ }
+
+ if msg.Command == nil {
+ return frame, nil
+ }
+
+ frame.Command, err = commandName(msg.Command)
+ if err != nil {
+ return jsonFrame{}, err
+ }
+
+ frame.Options, err = json.Marshal(msg.Command)
+ if err != nil {
+ return jsonFrame{}, err
+ }
+
+ return frame, nil
+}
+
+func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) {
+ var (
+ err error
+ msg Message
+ )
+
+ msg.ID = frame.ID
+
+ if frame.Payloads != nil {
+ msg.Payloads = &common.Payloads{}
+
+ err = msg.Payloads.Unmarshal(frame.Payloads)
+ if err != nil {
+ return Message{}, err
+ }
+ }
+
+ if frame.Failure != nil {
+ msg.Failure = &failure.Failure{}
+
+ err = msg.Failure.Unmarshal(frame.Failure)
+ if err != nil {
+ return Message{}, err
+ }
+ }
+
+ if frame.Command != "" {
+ cmd, err := initCommand(frame.Command)
+ if err != nil {
+ return Message{}, err
+ }
+
+ err = json.Unmarshal(frame.Options, &cmd)
+ if err != nil {
+ return Message{}, err
+ }
+
+ msg.Command = cmd
+ }
+
+ return msg, nil
+}
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go
new file mode 100644
index 00000000..d5e0f49d
--- /dev/null
+++ b/plugins/temporal/protocol/message.go
@@ -0,0 +1,334 @@
+package protocol
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/activity"
+ bindings "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/workflow"
+)
+
+const (
+ getWorkerInfoCommand = "GetWorkerInfo"
+
+ invokeActivityCommand = "InvokeActivity"
+ startWorkflowCommand = "StartWorkflow"
+ invokeSignalCommand = "InvokeSignal"
+ invokeQueryCommand = "InvokeQuery"
+ destroyWorkflowCommand = "DestroyWorkflow"
+ cancelWorkflowCommand = "CancelWorkflow"
+ getStackTraceCommand = "StackTrace"
+
+ executeActivityCommand = "ExecuteActivity"
+ executeChildWorkflowCommand = "ExecuteChildWorkflow"
+ getChildWorkflowExecutionCommand = "GetChildWorkflowExecution"
+
+ newTimerCommand = "NewTimer"
+ sideEffectCommand = "SideEffect"
+ getVersionCommand = "GetVersion"
+ completeWorkflowCommand = "CompleteWorkflow"
+ continueAsNewCommand = "ContinueAsNew"
+
+ signalExternalWorkflowCommand = "SignalExternalWorkflow"
+ cancelExternalWorkflowCommand = "CancelExternalWorkflow"
+
+ cancelCommand = "Cancel"
+ panicCommand = "Panic"
+)
+
+// GetWorkerInfo reads worker information.
+type GetWorkerInfo struct{}
+
+// InvokeActivity invokes activity.
+type InvokeActivity struct {
+ // Name defines activity name.
+ Name string `json:"name"`
+
+ // Info contains execution context.
+ Info activity.Info `json:"info"`
+
+ // HeartbeatDetails indicates that the payload also contains last heartbeat details.
+ HeartbeatDetails int `json:"heartbeatDetails,omitempty"`
+}
+
+// StartWorkflow sends worker command to start workflow.
+type StartWorkflow struct {
+ // Info to define workflow context.
+ Info *workflow.Info `json:"info"`
+
+ // LastCompletion contains offset of last completion results.
+ LastCompletion int `json:"lastCompletion,omitempty"`
+}
+
+// InvokeSignal invokes signal with a set of arguments.
+type InvokeSignal struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+
+ // Name of the signal.
+ Name string `json:"name"`
+}
+
+// InvokeQuery invokes query with a set of arguments.
+type InvokeQuery struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+ // Name of the query.
+ Name string `json:"name"`
+}
+
+// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal).
+type CancelWorkflow struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+}
+
+// DestroyWorkflow asks worker to offload workflow from memory.
+type DestroyWorkflow struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+}
+
+// GetStackTrace asks worker to offload workflow from memory.
+type GetStackTrace struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+}
+
+// ExecuteActivity command by workflow worker.
+type ExecuteActivity struct {
+ // Name defines activity name.
+ Name string `json:"name"`
+ // Options to run activity.
+ Options bindings.ExecuteActivityOptions `json:"options,omitempty"`
+}
+
+// ExecuteChildWorkflow executes child workflow.
+type ExecuteChildWorkflow struct {
+ // Name defines workflow name.
+ Name string `json:"name"`
+ // Options to run activity.
+ Options bindings.WorkflowOptions `json:"options,omitempty"`
+}
+
+// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow.
+type GetChildWorkflowExecution struct {
+ // ID of child workflow command.
+ ID uint64 `json:"id"`
+}
+
+// NewTimer starts new timer.
+type NewTimer struct {
+ // Milliseconds defines timer duration.
+ Milliseconds int `json:"ms"`
+}
+
+// SideEffect to be recorded into the history.
+type SideEffect struct{}
+
+// GetVersion requests version marker.
+type GetVersion struct {
+ ChangeID string `json:"changeID"`
+ MinSupported int `json:"minSupported"`
+ MaxSupported int `json:"maxSupported"`
+}
+
+// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload.
+type CompleteWorkflow struct{}
+
+// ContinueAsNew restarts workflow with new running instance.
+type ContinueAsNew struct {
+ // Result defines workflow execution result.
+ Name string `json:"name"`
+
+ // Options for continued as new workflow.
+ Options struct {
+ TaskQueueName string
+ WorkflowExecutionTimeout time.Duration
+ WorkflowRunTimeout time.Duration
+ WorkflowTaskTimeout time.Duration
+ } `json:"options"`
+}
+
+// SignalExternalWorkflow sends signal to external workflow.
+type SignalExternalWorkflow struct {
+ Namespace string `json:"namespace"`
+ WorkflowID string `json:"workflowID"`
+ RunID string `json:"runID"`
+ Signal string `json:"signal"`
+ ChildWorkflowOnly bool `json:"childWorkflowOnly"`
+}
+
+// CancelExternalWorkflow canceller external workflow.
+type CancelExternalWorkflow struct {
+ Namespace string `json:"namespace"`
+ WorkflowID string `json:"workflowID"`
+ RunID string `json:"runID"`
+}
+
+// Cancel one or multiple internal promises (activities, local activities, timers, child workflows).
+type Cancel struct {
+ // CommandIDs to be cancelled.
+ CommandIDs []uint64 `json:"ids"`
+}
+
+// Panic triggers panic in workflow process.
+type Panic struct {
+ // Message to include into the error.
+ Message string `json:"message"`
+}
+
+// ActivityParams maps activity command to activity params.
+func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams {
+ params := bindings.ExecuteActivityParams{
+ ExecuteActivityOptions: cmd.Options,
+ ActivityType: bindings.ActivityType{Name: cmd.Name},
+ Input: payloads,
+ }
+
+ if params.TaskQueueName == "" {
+ params.TaskQueueName = env.WorkflowInfo().TaskQueueName
+ }
+
+ return params
+}
+
+// WorkflowParams maps workflow command to workflow params.
+func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams {
+ params := bindings.ExecuteWorkflowParams{
+ WorkflowOptions: cmd.Options,
+ WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
+ Input: payloads,
+ }
+
+ if params.TaskQueueName == "" {
+ params.TaskQueueName = env.WorkflowInfo().TaskQueueName
+ }
+
+ return params
+}
+
+// ToDuration converts timer command to time.Duration.
+func (cmd NewTimer) ToDuration() time.Duration {
+ return time.Millisecond * time.Duration(cmd.Milliseconds)
+}
+
+// returns command name (only for the commands sent to the worker)
+func commandName(cmd interface{}) (string, error) {
+ const op = errors.Op("command_name")
+ switch cmd.(type) {
+ case GetWorkerInfo, *GetWorkerInfo:
+ return getWorkerInfoCommand, nil
+ case StartWorkflow, *StartWorkflow:
+ return startWorkflowCommand, nil
+ case InvokeSignal, *InvokeSignal:
+ return invokeSignalCommand, nil
+ case InvokeQuery, *InvokeQuery:
+ return invokeQueryCommand, nil
+ case DestroyWorkflow, *DestroyWorkflow:
+ return destroyWorkflowCommand, nil
+ case CancelWorkflow, *CancelWorkflow:
+ return cancelWorkflowCommand, nil
+ case GetStackTrace, *GetStackTrace:
+ return getStackTraceCommand, nil
+ case InvokeActivity, *InvokeActivity:
+ return invokeActivityCommand, nil
+ case ExecuteActivity, *ExecuteActivity:
+ return executeActivityCommand, nil
+ case ExecuteChildWorkflow, *ExecuteChildWorkflow:
+ return executeChildWorkflowCommand, nil
+ case GetChildWorkflowExecution, *GetChildWorkflowExecution:
+ return getChildWorkflowExecutionCommand, nil
+ case NewTimer, *NewTimer:
+ return newTimerCommand, nil
+ case GetVersion, *GetVersion:
+ return getVersionCommand, nil
+ case SideEffect, *SideEffect:
+ return sideEffectCommand, nil
+ case CompleteWorkflow, *CompleteWorkflow:
+ return completeWorkflowCommand, nil
+ case ContinueAsNew, *ContinueAsNew:
+ return continueAsNewCommand, nil
+ case SignalExternalWorkflow, *SignalExternalWorkflow:
+ return signalExternalWorkflowCommand, nil
+ case CancelExternalWorkflow, *CancelExternalWorkflow:
+ return cancelExternalWorkflowCommand, nil
+ case Cancel, *Cancel:
+ return cancelCommand, nil
+ case Panic, *Panic:
+ return panicCommand, nil
+ default:
+ return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd))
+ }
+}
+
+// reads command from binary payload
+func initCommand(name string) (interface{}, error) {
+ const op = errors.Op("init_command")
+ switch name {
+ case getWorkerInfoCommand:
+ return &GetWorkerInfo{}, nil
+
+ case startWorkflowCommand:
+ return &StartWorkflow{}, nil
+
+ case invokeSignalCommand:
+ return &InvokeSignal{}, nil
+
+ case invokeQueryCommand:
+ return &InvokeQuery{}, nil
+
+ case destroyWorkflowCommand:
+ return &DestroyWorkflow{}, nil
+
+ case cancelWorkflowCommand:
+ return &CancelWorkflow{}, nil
+
+ case getStackTraceCommand:
+ return &GetStackTrace{}, nil
+
+ case invokeActivityCommand:
+ return &InvokeActivity{}, nil
+
+ case executeActivityCommand:
+ return &ExecuteActivity{}, nil
+
+ case executeChildWorkflowCommand:
+ return &ExecuteChildWorkflow{}, nil
+
+ case getChildWorkflowExecutionCommand:
+ return &GetChildWorkflowExecution{}, nil
+
+ case newTimerCommand:
+ return &NewTimer{}, nil
+
+ case getVersionCommand:
+ return &GetVersion{}, nil
+
+ case sideEffectCommand:
+ return &SideEffect{}, nil
+
+ case completeWorkflowCommand:
+ return &CompleteWorkflow{}, nil
+
+ case continueAsNewCommand:
+ return &ContinueAsNew{}, nil
+
+ case signalExternalWorkflowCommand:
+ return &SignalExternalWorkflow{}, nil
+
+ case cancelExternalWorkflowCommand:
+ return &CancelExternalWorkflow{}, nil
+
+ case cancelCommand:
+ return &Cancel{}, nil
+
+ case panicCommand:
+ return &Panic{}, nil
+
+ default:
+ return nil, errors.E(op, errors.Errorf("undefined command name: %s", name))
+ }
+}
diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go
new file mode 100644
index 00000000..607fe0fe
--- /dev/null
+++ b/plugins/temporal/protocol/proto_codec.go
@@ -0,0 +1,145 @@
+package protocol
+
+import (
+ v1 "github.com/golang/protobuf/proto" //nolint:staticcheck
+ jsoniter "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal"
+ "google.golang.org/protobuf/proto"
+)
+
+type (
+ // ProtoCodec uses protobuf to exchange messages with underlying workers.
+ ProtoCodec struct {
+ }
+)
+
+// NewProtoCodec creates new Proto communication codec.
+func NewProtoCodec() Codec {
+ return &ProtoCodec{}
+}
+
+// WithLogger creates new codes instance with attached logger.
+func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec {
+ return &ProtoCodec{}
+}
+
+// GetName returns codec name.
+func (c *ProtoCodec) GetName() string {
+ return "protobuf"
+}
+
+// Execute exchanges commands with worker.
+func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
+ if len(msg) == 0 {
+ return nil, nil
+ }
+
+ var request = &internal.Frame{}
+ var response = &internal.Frame{}
+ var result = make([]Message, 0, 5)
+ var err error
+
+ for _, m := range msg {
+ frame, err := c.packMessage(m)
+ if err != nil {
+ return nil, err
+ }
+
+ request.Messages = append(request.Messages, frame)
+ }
+
+ p := payload.Payload{}
+
+ // context is always in json format
+ if ctx.IsEmpty() {
+ p.Context = []byte("null")
+ }
+
+ p.Context, err = jsoniter.Marshal(ctx)
+ if err != nil {
+ return nil, errors.E(errors.Op("encodeContext"), err)
+ }
+
+ p.Body, err = proto.Marshal(v1.MessageV2(request))
+ if err != nil {
+ return nil, errors.E(errors.Op("encodePayload"), err)
+ }
+
+ out, err := e.Exec(p)
+ if err != nil {
+ return nil, errors.E(errors.Op("execute"), err)
+ }
+
+ if len(out.Body) == 0 {
+ // worker inactive or closed
+ return nil, nil
+ }
+
+ err = proto.Unmarshal(out.Body, v1.MessageV2(response))
+ if err != nil {
+ return nil, errors.E(errors.Op("parseResponse"), err)
+ }
+
+ for _, f := range response.Messages {
+ msg, err := c.parseMessage(f)
+ if err != nil {
+ return nil, err
+ }
+
+ result = append(result, msg)
+ }
+
+ return result, nil
+}
+
+func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) {
+ var err error
+
+ frame := &internal.Message{
+ Id: msg.ID,
+ Payloads: msg.Payloads,
+ Failure: msg.Failure,
+ }
+
+ if msg.Command != nil {
+ frame.Command, err = commandName(msg.Command)
+ if err != nil {
+ return nil, err
+ }
+
+ frame.Options, err = jsoniter.Marshal(msg.Command)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return frame, nil
+}
+
+func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
+ const op = errors.Op("proto_codec_parse_message")
+ var err error
+
+ msg := Message{
+ ID: frame.Id,
+ Payloads: frame.Payloads,
+ Failure: frame.Failure,
+ }
+
+ if frame.Command != "" {
+ msg.Command, err = initCommand(frame.Command)
+ if err != nil {
+ return Message{}, errors.E(op, err)
+ }
+
+ err = jsoniter.Unmarshal(frame.Options, &msg.Command)
+ if err != nil {
+ return Message{}, errors.E(op, err)
+ }
+ }
+
+ return msg, nil
+}
diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go
new file mode 100644
index 00000000..53076fdf
--- /dev/null
+++ b/plugins/temporal/protocol/protocol.go
@@ -0,0 +1,77 @@
+package protocol
+
+import (
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+const (
+ // DebugNone disables all debug messages.
+ DebugNone = iota
+
+ // DebugNormal renders all messages into console.
+ DebugNormal
+
+ // DebugHumanized enables color highlights for messages.
+ DebugHumanized
+)
+
+// Context provides worker information about currently. Context can be empty for server level commands.
+type Context struct {
+ // TaskQueue associates message batch with the specific task queue in underlying worker.
+ TaskQueue string `json:"taskQueue,omitempty"`
+
+ // TickTime associated current or historical time with message batch.
+ TickTime string `json:"tickTime,omitempty"`
+
+ // Replay indicates that current message batch is historical.
+ Replay bool `json:"replay,omitempty"`
+}
+
+// Message used to exchange the send commands and receive responses from underlying workers.
+type Message struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
+
+ // Command of the message in unmarshalled form. Pointer.
+ Command interface{} `json:"command,omitempty"`
+
+ // Failure associated with command id.
+ Failure *failure.Failure `json:"failure,omitempty"`
+
+ // Payloads contains message specific payloads in binary format.
+ Payloads *commonpb.Payloads `json:"payloads,omitempty"`
+}
+
+// Codec manages payload encoding and decoding while communication with underlying worker.
+type Codec interface {
+ // WithLogger creates new codes instance with attached logger.
+ WithLogger(logger.Logger) Codec
+
+ // GetName returns codec name.
+ GetName() string
+
+ // Execute sends message to worker and waits for the response.
+ Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error)
+}
+
+// Endpoint provides the ability to send and receive messages.
+type Endpoint interface {
+ // ExecWithContext allow to set ExecTTL
+ Exec(p payload.Payload) (payload.Payload, error)
+}
+
+// DebugLevel configures debug level.
+type DebugLevel int
+
+// IsEmpty only check if task queue set.
+func (ctx Context) IsEmpty() bool {
+ return ctx.TaskQueue == ""
+}
+
+// IsCommand returns true if message carries request.
+func (msg Message) IsCommand() bool {
+ return msg.Command != nil
+}
diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go
new file mode 100644
index 00000000..58a0ae66
--- /dev/null
+++ b/plugins/temporal/protocol/worker_info.go
@@ -0,0 +1,72 @@
+package protocol
+
+import (
+ "github.com/spiral/errors"
+ "go.temporal.io/sdk/converter"
+ "go.temporal.io/sdk/worker"
+)
+
+// WorkerInfo outlines information about every available worker and it's TaskQueues.
+
+// WorkerInfo lists available task queues, workflows and activities.
+type WorkerInfo struct {
+ // TaskQueue assigned to the worker.
+ TaskQueue string `json:"taskQueue"`
+
+ // Options describe worker options.
+ Options worker.Options `json:"options,omitempty"`
+
+ // Workflows provided by the worker.
+ Workflows []WorkflowInfo
+
+ // Activities provided by the worker.
+ Activities []ActivityInfo
+}
+
+// WorkflowInfo describes single worker workflow.
+type WorkflowInfo struct {
+ // Name of the workflow.
+ Name string `json:"name"`
+
+ // Queries pre-defined for the workflow type.
+ Queries []string `json:"queries"`
+
+ // Signals pre-defined for the workflow type.
+ Signals []string `json:"signals"`
+}
+
+// ActivityInfo describes single worker activity.
+type ActivityInfo struct {
+ // Name describes public activity name.
+ Name string `json:"name"`
+}
+
+// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process).
+func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) {
+ const op = errors.Op("fetch_worker_info")
+
+ result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}})
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if len(result) != 1 {
+ return nil, errors.E(op, errors.Str("unable to read worker info"))
+ }
+
+ if result[0].ID != 0 {
+ return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing"))
+ }
+
+ var info []WorkerInfo
+ for i := range result[0].Payloads.Payloads {
+ wi := WorkerInfo{}
+ if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ info = append(info, wi)
+ }
+
+ return info, nil
+}
diff --git a/plugins/temporal/workflow/canceller.go b/plugins/temporal/workflow/canceller.go
new file mode 100644
index 00000000..962c527f
--- /dev/null
+++ b/plugins/temporal/workflow/canceller.go
@@ -0,0 +1,41 @@
+package workflow
+
+import (
+ "sync"
+)
+
+type cancellable func() error
+
+type canceller struct {
+ ids sync.Map
+}
+
+func (c *canceller) register(id uint64, cancel cancellable) {
+ c.ids.Store(id, cancel)
+}
+
+func (c *canceller) discard(id uint64) {
+ c.ids.Delete(id)
+}
+
+func (c *canceller) cancel(ids ...uint64) error {
+ var err error
+ for _, id := range ids {
+ cancel, ok := c.ids.Load(id)
+ if ok == false {
+ continue
+ }
+
+ // TODO return when minimum supported version will be go 1.15
+ // go1.14 don't have LoadAndDelete method
+ // It was introduced only in go1.15
+ c.ids.Delete(id)
+
+ err = cancel.(cancellable)()
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/plugins/temporal/workflow/canceller_test.go b/plugins/temporal/workflow/canceller_test.go
new file mode 100644
index 00000000..d6e846f8
--- /dev/null
+++ b/plugins/temporal/workflow/canceller_test.go
@@ -0,0 +1,33 @@
+package workflow
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_CancellerNoListeners(t *testing.T) {
+ c := &canceller{}
+
+ assert.NoError(t, c.cancel(1))
+}
+
+func Test_CancellerListenerError(t *testing.T) {
+ c := &canceller{}
+ c.register(1, func() error {
+ return errors.New("failed")
+ })
+
+ assert.Error(t, c.cancel(1))
+}
+
+func Test_CancellerListenerDiscarded(t *testing.T) {
+ c := &canceller{}
+ c.register(1, func() error {
+ return errors.New("failed")
+ })
+
+ c.discard(1)
+ assert.NoError(t, c.cancel(1))
+}
diff --git a/plugins/temporal/workflow/id_registry.go b/plugins/temporal/workflow/id_registry.go
new file mode 100644
index 00000000..ac75cbda
--- /dev/null
+++ b/plugins/temporal/workflow/id_registry.go
@@ -0,0 +1,51 @@
+package workflow
+
+import (
+ "sync"
+
+ bindings "go.temporal.io/sdk/internalbindings"
+)
+
+// used to gain access to child workflow ids after they become available via callback result.
+type idRegistry struct {
+ mu sync.Mutex
+ ids map[uint64]entry
+ listeners map[uint64]listener
+}
+
+type listener func(w bindings.WorkflowExecution, err error)
+
+type entry struct {
+ w bindings.WorkflowExecution
+ err error
+}
+
+func newIDRegistry() *idRegistry {
+ return &idRegistry{
+ ids: map[uint64]entry{},
+ listeners: map[uint64]listener{},
+ }
+}
+
+func (c *idRegistry) listen(id uint64, cl listener) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.listeners[id] = cl
+
+ if e, ok := c.ids[id]; ok {
+ cl(e.w, e.err)
+ }
+}
+
+func (c *idRegistry) push(id uint64, w bindings.WorkflowExecution, err error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ e := entry{w: w, err: err}
+ c.ids[id] = e
+
+ if l, ok := c.listeners[id]; ok {
+ l(e.w, e.err)
+ }
+}
diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go
new file mode 100644
index 00000000..8f4409d1
--- /dev/null
+++ b/plugins/temporal/workflow/message_queue.go
@@ -0,0 +1,47 @@
+package workflow
+
+import (
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+type messageQueue struct {
+ seqID func() uint64
+ queue []rrt.Message
+}
+
+func newMessageQueue(sedID func() uint64) *messageQueue {
+ return &messageQueue{
+ seqID: sedID,
+ queue: make([]rrt.Message, 0, 5),
+ }
+}
+
+func (mq *messageQueue) flush() {
+ mq.queue = mq.queue[0:0]
+}
+
+func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) {
+ msg := rrt.Message{
+ ID: mq.seqID(),
+ Command: cmd,
+ Payloads: payloads,
+ }
+
+ return msg.ID, msg
+}
+
+func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 {
+ id, msg := mq.allocateMessage(cmd, payloads)
+ mq.queue = append(mq.queue, msg)
+ return id
+}
+
+func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) {
+ mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads})
+}
+
+func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) {
+ mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure})
+}
diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go
new file mode 100644
index 00000000..1fcd409f
--- /dev/null
+++ b/plugins/temporal/workflow/message_queue_test.go
@@ -0,0 +1,53 @@
+package workflow
+
+import (
+ "sync/atomic"
+ "testing"
+
+ "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+func Test_MessageQueueFlushError(t *testing.T) {
+ var index uint64
+ mq := newMessageQueue(func() uint64 {
+ return atomic.AddUint64(&index, 1)
+ })
+
+ mq.pushError(1, &failure.Failure{})
+ assert.Len(t, mq.queue, 1)
+
+ mq.flush()
+ assert.Len(t, mq.queue, 0)
+ assert.Equal(t, uint64(0), index)
+}
+
+func Test_MessageQueueFlushResponse(t *testing.T) {
+ var index uint64
+ mq := newMessageQueue(func() uint64 {
+ return atomic.AddUint64(&index, 1)
+ })
+
+ mq.pushResponse(1, &common.Payloads{})
+ assert.Len(t, mq.queue, 1)
+
+ mq.flush()
+ assert.Len(t, mq.queue, 0)
+ assert.Equal(t, uint64(0), index)
+}
+
+func Test_MessageQueueCommandID(t *testing.T) {
+ var index uint64
+ mq := newMessageQueue(func() uint64 {
+ return atomic.AddUint64(&index, 1)
+ })
+
+ n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{})
+ assert.Equal(t, n, index)
+ assert.Len(t, mq.queue, 1)
+
+ mq.flush()
+ assert.Len(t, mq.queue, 0)
+}
diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go
new file mode 100644
index 00000000..572d9a3b
--- /dev/null
+++ b/plugins/temporal/workflow/plugin.go
@@ -0,0 +1,203 @@
+package workflow
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+)
+
+const (
+ // PluginName defines public service name.
+ PluginName = "workflows"
+
+ // RRMode sets as RR_MODE env variable to let worker know about the mode to run.
+ RRMode = "temporal/workflow"
+)
+
+// Plugin manages workflows and workers.
+type Plugin struct {
+ temporal client.Temporal
+ events events.Handler
+ server server.Server
+ log logger.Logger
+ mu sync.Mutex
+ reset chan struct{}
+ pool workflowPool
+ closing int64
+}
+
+// Init workflow plugin.
+func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
+ p.temporal = temporal
+ p.server = server
+ p.events = events.NewEventsHandler()
+ p.log = log
+ p.reset = make(chan struct{}, 1)
+
+ return nil
+}
+
+// Serve starts workflow service.
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("workflow_plugin_serve")
+ errCh := make(chan error, 1)
+
+ pool, err := p.startPool()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ p.pool = pool
+
+ go func() {
+ for {
+ select {
+ case <-p.reset:
+ if atomic.LoadInt64(&p.closing) == 1 {
+ return
+ }
+
+ err := p.replacePool()
+ if err == nil {
+ continue
+ }
+
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.InitialInterval = time.Second
+
+ err = backoff.Retry(p.replacePool, bkoff)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ }
+ }
+ }
+ }()
+
+ return errCh
+}
+
+// Stop workflow service.
+func (p *Plugin) Stop() error {
+ const op = errors.Op("workflow_plugin_stop")
+ atomic.StoreInt64(&p.closing, 1)
+
+ pool := p.getPool()
+ if pool != nil {
+ p.pool = nil
+ err := pool.Destroy(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+
+ return nil
+}
+
+// Name of the service.
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// Workers returns list of available workflow workers.
+func (p *Plugin) Workers() []worker.BaseProcess {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.pool.Workers()
+}
+
+// WorkflowNames returns list of all available workflows.
+func (p *Plugin) WorkflowNames() []string {
+ return p.pool.WorkflowNames()
+}
+
+// Reset resets underlying workflow pool with new copy.
+func (p *Plugin) Reset() error {
+ p.reset <- struct{}{}
+
+ return nil
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) poolListener(event interface{}) {
+ if ev, ok := event.(PoolEvent); ok {
+ if ev.Event == eventWorkerExit {
+ if ev.Caused != nil {
+ p.log.Error("Workflow pool error", "error", ev.Caused)
+ }
+ p.reset <- struct{}{}
+ }
+ }
+
+ p.events.Push(event)
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) startPool() (workflowPool, error) {
+ const op = errors.Op("workflow_plugin_start_pool")
+ pool, err := newWorkflowPool(
+ p.temporal.GetCodec().WithLogger(p.log),
+ p.poolListener,
+ p.server,
+ )
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ err = pool.Start(context.Background(), p.temporal)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames())
+
+ return pool, nil
+}
+
+func (p *Plugin) replacePool() error {
+ p.mu.Lock()
+ const op = errors.Op("workflow_plugin_replace_pool")
+ defer p.mu.Unlock()
+
+ if p.pool != nil {
+ err := p.pool.Destroy(context.Background())
+ p.pool = nil
+ if err != nil {
+ p.log.Error(
+ "Unable to destroy expired workflow pool",
+ "error",
+ errors.E(op, err),
+ )
+ return errors.E(op, err)
+ }
+ }
+
+ pool, err := p.startPool()
+ if err != nil {
+ p.log.Error("Replace workflow pool failed", "error", err)
+ return errors.E(op, err)
+ }
+
+ p.pool = pool
+ p.log.Debug("workflow pool successfully replaced")
+
+ return nil
+}
+
+// getPool returns currently pool.
+func (p *Plugin) getPool() workflowPool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.pool
+}
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go
new file mode 100644
index 00000000..45e6885c
--- /dev/null
+++ b/plugins/temporal/workflow/process.go
@@ -0,0 +1,436 @@
+package workflow
+
+import (
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ commonpb "go.temporal.io/api/common/v1"
+ bindings "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/workflow"
+)
+
+// wraps single workflow process
+type workflowProcess struct {
+ codec rrt.Codec
+ pool workflowPool
+ env bindings.WorkflowEnvironment
+ header *commonpb.Header
+ mq *messageQueue
+ ids *idRegistry
+ seqID uint64
+ runID string
+ pipeline []rrt.Message
+ callbacks []func() error
+ canceller *canceller
+ inLoop bool
+}
+
+// Execute workflow, bootstraps process.
+func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) {
+ wf.env = env
+ wf.header = header
+ wf.seqID = 0
+ wf.runID = env.WorkflowInfo().WorkflowExecution.RunID
+ wf.canceller = &canceller{}
+
+ // sequenceID shared for all worker workflows
+ wf.mq = newMessageQueue(wf.pool.SeqID)
+ wf.ids = newIDRegistry()
+
+ env.RegisterCancelHandler(wf.handleCancel)
+ env.RegisterSignalHandler(wf.handleSignal)
+ env.RegisterQueryHandler(wf.handleQuery)
+
+ var (
+ lastCompletion = bindings.GetLastCompletionResult(env)
+ lastCompletionOffset = 0
+ )
+
+ if lastCompletion != nil && len(lastCompletion.Payloads) != 0 {
+ if input == nil {
+ input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}}
+ }
+
+ input.Payloads = append(input.Payloads, lastCompletion.Payloads...)
+ lastCompletionOffset = len(lastCompletion.Payloads)
+ }
+
+ _ = wf.mq.pushCommand(
+ rrt.StartWorkflow{
+ Info: env.WorkflowInfo(),
+ LastCompletion: lastCompletionOffset,
+ },
+ input,
+ )
+}
+
+// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server.
+func (wf *workflowProcess) OnWorkflowTaskStarted() {
+ wf.inLoop = true
+ defer func() { wf.inLoop = false }()
+
+ var err error
+ for _, callback := range wf.callbacks {
+ err = callback()
+ if err != nil {
+ panic(err)
+ }
+ }
+ wf.callbacks = nil
+
+ if err := wf.flushQueue(); err != nil {
+ panic(err)
+ }
+
+ for len(wf.pipeline) > 0 {
+ msg := wf.pipeline[0]
+ wf.pipeline = wf.pipeline[1:]
+
+ if msg.IsCommand() {
+ err = wf.handleMessage(msg)
+ }
+
+ if err != nil {
+ panic(err)
+ }
+ }
+}
+
+// StackTrace renders workflow stack trace.
+func (wf *workflowProcess) StackTrace() string {
+ result, err := wf.runCommand(
+ rrt.GetStackTrace{
+ RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
+ },
+ nil,
+ )
+
+ if err != nil {
+ return err.Error()
+ }
+
+ var stacktrace string
+ err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace)
+ if err != nil {
+ return err.Error()
+ }
+
+ return stacktrace
+}
+
+// Close the workflow.
+func (wf *workflowProcess) Close() {
+ // TODO: properly handle errors
+ // panic(err)
+
+ _ = wf.mq.pushCommand(
+ rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
+ nil,
+ )
+
+ _, _ = wf.discardQueue()
+}
+
+// execution context.
+func (wf *workflowProcess) getContext() rrt.Context {
+ return rrt.Context{
+ TaskQueue: wf.env.WorkflowInfo().TaskQueueName,
+ TickTime: wf.env.Now().Format(time.RFC3339),
+ Replay: wf.env.IsReplaying(),
+ }
+}
+
+// schedule cancel command
+func (wf *workflowProcess) handleCancel() {
+ _ = wf.mq.pushCommand(
+ rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
+ nil,
+ )
+}
+
+// schedule the signal processing
+func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) {
+ _ = wf.mq.pushCommand(
+ rrt.InvokeSignal{
+ RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
+ Name: name,
+ },
+ input,
+ )
+}
+
+// Handle query in blocking mode.
+func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) {
+ result, err := wf.runCommand(
+ rrt.InvokeQuery{
+ RunID: wf.runID,
+ Name: queryType,
+ },
+ queryArgs,
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ if result.Failure != nil {
+ return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter())
+ }
+
+ return result.Payloads, nil
+}
+
+// process incoming command
+func (wf *workflowProcess) handleMessage(msg rrt.Message) error {
+ const op = errors.Op("handleMessage")
+ var err error
+
+ var (
+ id = msg.ID
+ cmd = msg.Command
+ payloads = msg.Payloads
+ )
+
+ switch cmd := cmd.(type) {
+ case *rrt.ExecuteActivity:
+ params := cmd.ActivityParams(wf.env, payloads)
+ activityID := wf.env.ExecuteActivity(params, wf.createCallback(id))
+
+ wf.canceller.register(id, func() error {
+ wf.env.RequestCancelActivity(activityID)
+ return nil
+ })
+
+ case *rrt.ExecuteChildWorkflow:
+ params := cmd.WorkflowParams(wf.env, payloads)
+
+ // always use deterministic id
+ if params.WorkflowID == "" {
+ nextID := atomic.AddUint64(&wf.seqID, 1)
+ params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID))
+ }
+
+ wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) {
+ wf.ids.push(id, r, e)
+ })
+
+ wf.canceller.register(id, func() error {
+ wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID)
+ return nil
+ })
+
+ case *rrt.GetChildWorkflowExecution:
+ wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) {
+ cl := wf.createCallback(id)
+
+ // TODO rewrite
+ if err != nil {
+ panic(err)
+ }
+
+ p, err := wf.env.GetDataConverter().ToPayloads(w)
+ if err != nil {
+ panic(err)
+ }
+
+ cl(p, err)
+ })
+
+ case *rrt.NewTimer:
+ timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id))
+ wf.canceller.register(id, func() error {
+ if timerID != nil {
+ wf.env.RequestCancelTimer(*timerID)
+ }
+ return nil
+ })
+
+ case *rrt.GetVersion:
+ version := wf.env.GetVersion(
+ cmd.ChangeID,
+ workflow.Version(cmd.MinSupported),
+ workflow.Version(cmd.MaxSupported),
+ )
+
+ result, err := wf.env.GetDataConverter().ToPayloads(version)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ wf.mq.pushResponse(id, result)
+ err = wf.flushQueue()
+ if err != nil {
+ panic(err)
+ }
+
+ case *rrt.SideEffect:
+ wf.env.SideEffect(
+ func() (*commonpb.Payloads, error) {
+ return payloads, nil
+ },
+ wf.createContinuableCallback(id),
+ )
+
+ case *rrt.CompleteWorkflow:
+ result, _ := wf.env.GetDataConverter().ToPayloads("completed")
+ wf.mq.pushResponse(id, result)
+
+ if msg.Failure == nil {
+ wf.env.Complete(payloads, nil)
+ } else {
+ wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter()))
+ }
+
+ case *rrt.ContinueAsNew:
+ result, _ := wf.env.GetDataConverter().ToPayloads("completed")
+ wf.mq.pushResponse(id, result)
+
+ wf.env.Complete(nil, &workflow.ContinueAsNewError{
+ WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
+ Input: payloads,
+ Header: wf.header,
+ TaskQueueName: cmd.Options.TaskQueueName,
+ WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout,
+ WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout,
+ WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout,
+ })
+
+ case *rrt.SignalExternalWorkflow:
+ wf.env.SignalExternalWorkflow(
+ cmd.Namespace,
+ cmd.WorkflowID,
+ cmd.RunID,
+ cmd.Signal,
+ payloads,
+ nil,
+ cmd.ChildWorkflowOnly,
+ wf.createCallback(id),
+ )
+
+ case *rrt.CancelExternalWorkflow:
+ wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id))
+
+ case *rrt.Cancel:
+ err = wf.canceller.cancel(cmd.CommandIDs...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ result, _ := wf.env.GetDataConverter().ToPayloads("completed")
+ wf.mq.pushResponse(id, result)
+
+ err = wf.flushQueue()
+ if err != nil {
+ panic(err)
+ }
+
+ case *rrt.Panic:
+ panic(errors.E(cmd.Message))
+
+ default:
+ panic("undefined command")
+ }
+
+ return nil
+}
+
+func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler {
+ callback := func(result *commonpb.Payloads, err error) error {
+ wf.canceller.discard(id)
+
+ if err != nil {
+ wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
+ return nil
+ }
+
+ // fetch original payload
+ wf.mq.pushResponse(id, result)
+ return nil
+ }
+
+ return func(result *commonpb.Payloads, err error) {
+ // timer cancel callback can happen inside the loop
+ if wf.inLoop {
+ err := callback(result, err)
+ if err != nil {
+ panic(err)
+ }
+
+ return
+ }
+
+ wf.callbacks = append(wf.callbacks, func() error {
+ return callback(result, err)
+ })
+ }
+}
+
+// callback to be called inside the queue processing, adds new messages at the end of the queue
+func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler {
+ callback := func(result *commonpb.Payloads, err error) {
+ wf.canceller.discard(id)
+
+ if err != nil {
+ wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
+ return
+ }
+
+ wf.mq.pushResponse(id, result)
+ err = wf.flushQueue()
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ return func(result *commonpb.Payloads, err error) {
+ callback(result, err)
+ }
+}
+
+// Exchange messages between host and worker processes and add new commands to the queue.
+func (wf *workflowProcess) flushQueue() error {
+ const op = errors.Op("flush queue")
+ messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
+ wf.mq.flush()
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ wf.pipeline = append(wf.pipeline, messages...)
+
+ return nil
+}
+
+// Exchange messages between host and worker processes without adding new commands to the queue.
+func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) {
+ const op = errors.Op("discard queue")
+ messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
+ wf.mq.flush()
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return messages, nil
+}
+
+// Run single command and return single result.
+func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) {
+ const op = errors.Op("workflow_process_runcommand")
+ _, msg := wf.mq.allocateMessage(cmd, payloads)
+
+ result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg)
+ if err != nil {
+ return rrt.Message{}, errors.E(op, err)
+ }
+
+ if len(result) != 1 {
+ return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response"))
+ }
+
+ return result[0], nil
+}
diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go
new file mode 100644
index 00000000..b9ed46c8
--- /dev/null
+++ b/plugins/temporal/workflow/workflow_pool.go
@@ -0,0 +1,190 @@
+package workflow
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ rrWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ bindings "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/worker"
+ "go.temporal.io/sdk/workflow"
+)
+
+const eventWorkerExit = 8390
+
+// RR_MODE env variable key
+const RR_MODE = "RR_MODE" //nolint
+
+// RR_CODEC env variable key
+const RR_CODEC = "RR_CODEC" //nolint
+
+type workflowPool interface {
+ SeqID() uint64
+ Exec(p payload.Payload) (payload.Payload, error)
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.BaseProcess
+ WorkflowNames() []string
+}
+
+// PoolEvent triggered on workflow pool worker events.
+type PoolEvent struct {
+ Event int
+ Context interface{}
+ Caused error
+}
+
+// workflowPoolImpl manages workflowProcess executions between worker restarts.
+type workflowPoolImpl struct {
+ codec rrt.Codec
+ seqID uint64
+ workflows map[string]rrt.WorkflowInfo
+ tWorkers []worker.Worker
+ mu sync.Mutex
+ worker rrWorker.SyncWorker
+ active bool
+}
+
+// newWorkflowPool creates new workflow pool.
+func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) {
+ const op = errors.Op("new_workflow_pool")
+ w, err := factory.NewWorker(
+ context.Background(),
+ map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()},
+ listener,
+ )
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ go func() {
+ err := w.Wait()
+ listener(PoolEvent{Event: eventWorkerExit, Caused: err})
+ }()
+
+ return &workflowPoolImpl{codec: codec, worker: rrWorker.From(w)}, nil
+}
+
+// Start the pool in non blocking mode.
+func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("workflow_pool_start")
+ pool.mu.Lock()
+ pool.active = true
+ pool.mu.Unlock()
+
+ err := pool.initWorkers(ctx, temporal)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ for i := 0; i < len(pool.tWorkers); i++ {
+ err := pool.tWorkers[i].Start()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+// Active.
+func (pool *workflowPoolImpl) Active() bool {
+ return pool.active
+}
+
+// Destroy stops all temporal workers and application worker.
+func (pool *workflowPoolImpl) Destroy(ctx context.Context) error {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+ const op = errors.Op("workflow_pool_destroy")
+
+ pool.active = false
+ for i := 0; i < len(pool.tWorkers); i++ {
+ pool.tWorkers[i].Stop()
+ }
+
+ worker.PurgeStickyWorkflowCache()
+
+ err := pool.worker.Stop()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+// NewWorkflowDefinition initiates new workflow process.
+func (pool *workflowPoolImpl) NewWorkflowDefinition() bindings.WorkflowDefinition {
+ return &workflowProcess{
+ codec: pool.codec,
+ pool: pool,
+ }
+}
+
+// NewWorkflowDefinition initiates new workflow process.
+func (pool *workflowPoolImpl) SeqID() uint64 {
+ return atomic.AddUint64(&pool.seqID, 1)
+}
+
+// Exec set of commands in thread safe move.
+func (pool *workflowPoolImpl) Exec(p payload.Payload) (payload.Payload, error) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ if !pool.active {
+ return payload.Payload{}, nil
+ }
+
+ return pool.worker.Exec(p)
+}
+
+func (pool *workflowPoolImpl) Workers() []rrWorker.BaseProcess {
+ return []rrWorker.BaseProcess{pool.worker}
+}
+
+func (pool *workflowPoolImpl) WorkflowNames() []string {
+ names := make([]string, 0, len(pool.workflows))
+ for name := range pool.workflows {
+ names = append(names, name)
+ }
+
+ return names
+}
+
+// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
+func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("workflow_pool_init_workers")
+ workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ pool.workflows = make(map[string]rrt.WorkflowInfo)
+ pool.tWorkers = make([]worker.Worker, 0)
+
+ for _, info := range workerInfo {
+ w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
+ if err != nil {
+ return errors.E(op, err, pool.Destroy(ctx))
+ }
+
+ pool.tWorkers = append(pool.tWorkers, w)
+ for _, workflowInfo := range info.Workflows {
+ w.RegisterWorkflowWithOptions(pool, workflow.RegisterOptions{
+ Name: workflowInfo.Name,
+ DisableAlreadyRegisteredCheck: false,
+ })
+
+ pool.workflows[workflowInfo.Name] = workflowInfo
+ }
+ }
+
+ return nil
+}
diff --git a/tests/composer.json b/tests/composer.json
index 889b6808..060f105e 100644
--- a/tests/composer.json
+++ b/tests/composer.json
@@ -1,14 +1,15 @@
{
- "minimum-stability": "beta",
+ "minimum-stability": "dev",
"prefer-stable": true,
"require": {
"nyholm/psr7": "^1.3",
"spiral/roadrunner": "^2.0",
- "spiral/roadrunner-http": "^2.0"
+ "spiral/roadrunner-http": "^2.0",
+ "temporal/sdk": "dev-master"
},
"autoload": {
"psr-4": {
- "Spiral\\RoadRunner\\": "src/"
+ "Temporal\\Tests\\": "temporal"
}
}
}
diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml
index fd1a48bf..fa1070e1 100644
--- a/tests/docker-compose.yaml
+++ b/tests/docker-compose.yaml
@@ -4,4 +4,36 @@ services:
memcached:
image: memcached:latest
ports:
- - "0.0.0.0:11211:11211" \ No newline at end of file
+ - "0.0.0.0:11211:11211"
+ cassandra:
+ image: cassandra:3.11
+ ports:
+ - "9042:9042"
+ temporal:
+ image: temporalio/auto-setup:${SERVER_TAG:-1.1.0}
+ ports:
+ - "7233:7233"
+ volumes:
+ - ${DYNAMIC_CONFIG_DIR:-../config/dynamicconfig}:/etc/temporal/config/dynamicconfig
+ environment:
+ - "CASSANDRA_SEEDS=cassandra"
+ - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml"
+ depends_on:
+ - cassandra
+ temporal-admin-tools:
+ image: temporalio/admin-tools:${SERVER_TAG:-1.1.0}
+ stdin_open: true
+ tty: true
+ environment:
+ - "TEMPORAL_CLI_ADDRESS=temporal:7233"
+ depends_on:
+ - temporal
+ temporal-web:
+ image: temporalio/web:${WEB_TAG:-1.5.0}
+ environment:
+ - "TEMPORAL_GRPC_ENDPOINT=temporal:7233"
+ - "TEMPORAL_PERMIT_WRITE_API=true"
+ ports:
+ - "8088:8088"
+ depends_on:
+ - temporal \ No newline at end of file
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index e47dbd44..fc672e36 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -156,7 +156,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
}, nil, p)
assert.NoError(t, err)
- hs := &http.Server{Addr: ":8088", Handler: h}
+ hs := &http.Server{Addr: ":19658", Handler: h}
defer func() {
err := hs.Shutdown(context.Background())
if err != nil {
@@ -172,7 +172,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
}()
time.Sleep(time.Millisecond * 10)
- req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
+ req, err := http.NewRequest("GET", "http://localhost:19658?hello=world", nil)
assert.NoError(t, err)
req.Header.Add("user-agent", "")
@@ -216,7 +216,7 @@ func TestHandler_User_Agent(t *testing.T) {
}, nil, p)
assert.NoError(t, err)
- hs := &http.Server{Addr: ":8088", Handler: h}
+ hs := &http.Server{Addr: ":25688", Handler: h}
defer func() {
err := hs.Shutdown(context.Background())
if err != nil {
@@ -232,7 +232,7 @@ func TestHandler_User_Agent(t *testing.T) {
}()
time.Sleep(time.Millisecond * 10)
- req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
+ req, err := http.NewRequest("GET", "http://localhost:25688?hello=world", nil)
assert.NoError(t, err)
req.Header.Add("User-Agent", "go-agent")
diff --git a/tests/plugins/temporal/.rr.yaml b/tests/plugins/temporal/.rr.yaml
new file mode 100644
index 00000000..04d0730d
--- /dev/null
+++ b/tests/plugins/temporal/.rr.yaml
@@ -0,0 +1,22 @@
+# Application configuration
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php worker.php"
+
+# Workflow and activity mesh service
+temporal:
+ address: "localhost:7233"
+ activities:
+ num_workers: 4
+ codec: json
+ debug_level: 2
+
+logs:
+ mode: none
+ channels:
+ activities:
+ mode: none
+ #workflows:
+ #mode: none \ No newline at end of file
diff --git a/tests/plugins/temporal/cancel_test.go b/tests/plugins/temporal/cancel_test.go
new file mode 100644
index 00000000..0fd3c126
--- /dev/null
+++ b/tests/plugins/temporal/cancel_test.go
@@ -0,0 +1,291 @@
+package tests
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/enums/v1"
+ "go.temporal.io/api/history/v1"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_SimpleWorkflowCancel(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleSignalledWorkflow")
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 500)
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.Error(t, w.Get(context.Background(), &result))
+
+ we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ assert.Equal(t, "Canceled", we.WorkflowExecutionInfo.Status.String())
+}
+
+func Test_CancellableWorkflowScope(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledScopeWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "yes", result)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_TIMER_CANCELED
+ })
+
+ s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
+ })
+}
+
+func Test_CancelledWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "CANCELLED", result)
+}
+
+func Test_CancelledWithCompensationWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledWithCompensationWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "yield",
+ "rollback",
+ "captured retry",
+ "captured promise on cancelled",
+ "START rollback",
+ "WAIT ROLLBACK",
+ "RESULT (ROLLBACK)", "DONE rollback",
+ "COMPLETE rollback",
+ "result: OK",
+ },
+ trace,
+ )
+}
+
+func Test_CancelledNestedWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledNestedWorkflow",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "CANCELLED", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "begin",
+ "first scope",
+ "second scope",
+ "close second scope",
+ "close first scope",
+ "second scope cancelled",
+ "first scope cancelled",
+ "close process",
+ },
+ trace,
+ )
+}
+
+func Test_CancelledNSingleScopeWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledSingleScopeWorkflow",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "start",
+ "in scope",
+ "on cancel",
+ "captured in scope",
+ "captured in process",
+ },
+ trace,
+ )
+}
+
+func Test_CancelledMidflightWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelledMidflightWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "start",
+ "in scope",
+ "on cancel",
+ "done cancel",
+ },
+ trace,
+ )
+
+ s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
+ })
+}
+
+func Test_CancelSignalledChildWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "CancelSignalledChildWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result interface{}
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "cancelled ok", result)
+
+ e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
+ assert.NoError(t, err)
+
+ trace := make([]string, 0)
+ assert.NoError(t, e.Get(&trace))
+ assert.Equal(
+ t,
+ []string{
+ "start",
+ "child started",
+ "child signalled",
+ "scope cancelled",
+ "process done",
+ },
+ trace,
+ )
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED
+ })
+}
diff --git a/tests/plugins/temporal/child_test.go b/tests/plugins/temporal/child_test.go
new file mode 100644
index 00000000..49521791
--- /dev/null
+++ b/tests/plugins/temporal/child_test.go
@@ -0,0 +1,84 @@
+package tests
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_ExecuteChildWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "WithChildWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "Child: CHILD HELLO WORLD", result)
+}
+
+func Test_ExecuteChildStubWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "WithChildStubWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "Child: CHILD HELLO WORLD", result)
+}
+
+func Test_ExecuteChildStubWorkflow_02(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ChildStubWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result []string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, []string{"HELLO WORLD", "UNTYPED"}, result)
+}
+
+func Test_SignalChildViaStubWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SignalChildViaStubWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, 8, result)
+}
diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go
new file mode 100644
index 00000000..9ca4d018
--- /dev/null
+++ b/tests/plugins/temporal/disaster_test.go
@@ -0,0 +1,114 @@
+package tests
+
+import (
+ "context"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_WorkerError_DisasterRecovery(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
+ assert.NoError(t, err)
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 750)
+
+ // must fully recover with new worker
+ assert.NoError(t, p.Kill())
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+}
+
+func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ defer func() {
+ // always restore script
+ _ = os.Rename("worker.bak", "worker.php")
+ }()
+
+ // Makes worker pool unable to recover for some time
+ _ = os.Rename("worker.php", "worker.bak")
+
+ p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
+ assert.NoError(t, err)
+
+ // must fully recover with new worker
+ assert.NoError(t, p.Kill())
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 750)
+
+ // restore the script and recover activity pool
+ _ = os.Rename("worker.bak", "worker.php")
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+}
+
+func Test_ActivityError_DisasterRecovery(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ defer func() {
+ // always restore script
+ _ = os.Rename("worker.bak", "worker.php")
+ }()
+
+ // Makes worker pool unable to recover for some time
+ _ = os.Rename("worker.php", "worker.bak")
+
+ // destroys all workers in activities
+ for _, wrk := range s.activities.Workers() {
+ assert.NoError(t, wrk.Kill())
+ }
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ // activity can't complete at this moment
+ time.Sleep(time.Millisecond * 750)
+
+ // restore the script and recover activity pool
+ _ = os.Rename("worker.bak", "worker.php")
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "HELLO WORLD", result)
+}
diff --git a/tests/plugins/temporal/hp_test.go b/tests/plugins/temporal/hp_test.go
new file mode 100644
index 00000000..bceac025
--- /dev/null
+++ b/tests/plugins/temporal/hp_test.go
@@ -0,0 +1,408 @@
+package tests
+
+import (
+ "context"
+ "crypto/rand"
+ "crypto/sha512"
+ "fmt"
+ "testing"
+ "time"
+
+ "go.temporal.io/api/common/v1"
+
+ "github.com/fatih/color"
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/enums/v1"
+ "go.temporal.io/api/history/v1"
+ "go.temporal.io/sdk/client"
+)
+
+func init() {
+ color.NoColor = false
+}
+
+func Test_VerifyRegistration(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ assert.Contains(t, s.workflows.WorkflowNames(), "SimpleWorkflow")
+
+ assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.echo")
+ assert.Contains(t, s.activities.ActivityNames(), "HeartBeatActivity.doSomething")
+
+ assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.lower")
+}
+
+func Test_ExecuteSimpleWorkflow_1(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "HELLO WORLD", result)
+}
+
+type User struct {
+ Name string
+ Email string
+}
+
+func Test_ExecuteSimpleDTOWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleDTOWorkflow",
+ User{
+ Name: "Antony",
+ Email: "[email protected]",
+ },
+ )
+ assert.NoError(t, err)
+
+ var result struct{ Message string }
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "Hello Antony <[email protected]>", result.Message)
+}
+
+func Test_ExecuteSimpleWorkflowWithSequenceInBatch(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "WorkflowWithSequence",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+}
+
+func Test_MultipleWorkflowsInSingleWorker(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ w2, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "HELLO WORLD", result)
+
+ assert.NoError(t, w2.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+}
+
+func Test_ExecuteWorkflowWithParallelScopes(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ParallelScopesWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "HELLO WORLD|Hello World|hello world", result)
+}
+
+func Test_Timer(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ start := time.Now()
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "TimerWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "hello world", result)
+ assert.True(t, time.Since(start).Seconds() > 1)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_TIMER_STARTED
+ })
+}
+
+func Test_SideEffect(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SideEffectWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Contains(t, result, "hello world-")
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ return event.EventType == enums.EVENT_TYPE_MARKER_RECORDED
+ })
+}
+
+func Test_EmptyWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "EmptyWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, 42, result)
+}
+
+func Test_PromiseChaining(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ChainedWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "result:hello world", result)
+}
+
+func Test_ActivityHeartbeat(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleHeartbeatWorkflow",
+ 2,
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+ assert.Len(t, we.PendingActivities, 1)
+
+ act := we.PendingActivities[0]
+
+ assert.Equal(t, `{"value":2}`, string(act.HeartbeatDetails.Payloads[0].Data))
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK", result)
+}
+
+func Test_FailedActivityHeartbeat(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "FailedHeartbeatWorkflow",
+ 8,
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+ assert.Len(t, we.PendingActivities, 1)
+
+ act := we.PendingActivities[0]
+
+ assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data))
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK!", result)
+}
+
+func Test_BinaryPayload(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ rnd := make([]byte, 2500)
+
+ _, err := rand.Read(rnd)
+ assert.NoError(t, err)
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "BinaryWorkflow",
+ rnd,
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+
+ assert.Equal(t, fmt.Sprintf("%x", sha512.Sum512(rnd)), result)
+}
+
+func Test_ContinueAsNew(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ContinuableWorkflow",
+ 1,
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+
+ we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
+ assert.NoError(t, err)
+
+ assert.Equal(t, "ContinuedAsNew", we.WorkflowExecutionInfo.Status.String())
+
+ time.Sleep(time.Second)
+
+ // the result of the final workflow
+ var result string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "OK6", result)
+}
+
+func Test_ActivityStubWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ActivityStubWorkflow",
+ "hello world",
+ )
+ assert.NoError(t, err)
+
+ // the result of the final workflow
+ var result []string
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, []string{
+ "HELLO WORLD",
+ "invalid method call",
+ "UNTYPED",
+ }, result)
+}
+
+func Test_ExecuteProtoWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "ProtoPayloadWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result common.WorkflowExecution
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, "updated", result.RunId)
+ assert.Equal(t, "workflow id", result.WorkflowId)
+}
+
+func Test_SagaWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SagaWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result string
+ assert.Error(t, w.Get(context.Background(), &result))
+}
diff --git a/tests/plugins/temporal/query_test.go b/tests/plugins/temporal/query_test.go
new file mode 100644
index 00000000..8b0caeee
--- /dev/null
+++ b/tests/plugins/temporal/query_test.go
@@ -0,0 +1,66 @@
+package tests
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_ListQueries(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "QueryWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 500)
+
+ v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "error", -1)
+ assert.Nil(t, v)
+ assert.Error(t, err)
+
+ assert.Contains(t, err.Error(), "KnownQueryTypes=[get]")
+
+ var r int
+ assert.NoError(t, w.Get(context.Background(), &r))
+ assert.Equal(t, 0, r)
+}
+
+func Test_GetQuery(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "QueryWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", 88)
+ assert.NoError(t, err)
+ time.Sleep(time.Millisecond * 500)
+
+ v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "get", nil)
+ assert.NoError(t, err)
+
+ var r int
+ assert.NoError(t, v.Get(&r))
+ assert.Equal(t, 88, r)
+
+ assert.NoError(t, w.Get(context.Background(), &r))
+ assert.Equal(t, 88, r)
+}
diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go
new file mode 100644
index 00000000..c8d815c3
--- /dev/null
+++ b/tests/plugins/temporal/server_test.go
@@ -0,0 +1,198 @@
+package tests
+
+import (
+ "context"
+ "testing"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/informer"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/resetter"
+ "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/activity"
+ rrClient "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/workflow"
+ "go.temporal.io/api/enums/v1"
+ "go.temporal.io/api/history/v1"
+ temporalClient "go.temporal.io/sdk/client"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+)
+
+type TestServer struct {
+ container endure.Container
+ temporal rrClient.Temporal
+ activities *activity.Plugin
+ workflows *workflow.Plugin
+}
+
+type ConfigOption struct {
+ Name string
+ Value interface{}
+}
+
+func NewOption(name string, value interface{}) ConfigOption {
+ return ConfigOption{Name: name, Value: value}
+}
+
+func NewTestServer(opt ...ConfigOption) *TestServer {
+ e, err := endure.NewContainer(initLogger(), endure.RetryOnFail(false))
+ if err != nil {
+ panic(err)
+ }
+
+ t := &rrClient.Plugin{}
+ a := &activity.Plugin{}
+ w := &workflow.Plugin{}
+
+ if err := e.Register(initConfig(opt...)); err != nil {
+ panic(err)
+ }
+
+ if err := e.Register(&logger.ZapLogger{}); err != nil {
+ panic(err)
+ }
+ if err := e.Register(&resetter.Plugin{}); err != nil {
+ panic(err)
+ }
+ if err := e.Register(&informer.Plugin{}); err != nil {
+ panic(err)
+ }
+ if err := e.Register(&server.Plugin{}); err != nil {
+ panic(err)
+ }
+ if err := e.Register(&rpc.Plugin{}); err != nil {
+ panic(err)
+ }
+
+ if err := e.Register(t); err != nil {
+ panic(err)
+ }
+ if err := e.Register(a); err != nil {
+ panic(err)
+ }
+ if err := e.Register(w); err != nil {
+ panic(err)
+ }
+
+ if err := e.Init(); err != nil {
+ panic(err)
+ }
+
+ errCh, err := e.Serve()
+ if err != nil {
+ panic(err)
+ }
+
+ go func() {
+ err := <-errCh
+ er := e.Stop()
+ if er != nil {
+ panic(err)
+ }
+ }()
+
+ return &TestServer{container: e, temporal: t, activities: a, workflows: w}
+}
+
+func (s *TestServer) Client() temporalClient.Client {
+ return s.temporal.GetClient()
+}
+
+func (s *TestServer) MustClose() {
+ err := s.container.Stop()
+ if err != nil {
+ panic(err)
+ }
+}
+
+func initConfig(opt ...ConfigOption) config.Configurer {
+ cfg := &config.Viper{}
+ cfg.Path = ".rr.yaml"
+ cfg.Prefix = "rr"
+
+ return cfg
+}
+
+func initLogger() *zap.Logger {
+ cfg := zap.Config{
+ Level: zap.NewAtomicLevelAt(zap.ErrorLevel),
+ Encoding: "console",
+ EncoderConfig: zapcore.EncoderConfig{
+ MessageKey: "message",
+ LevelKey: "level",
+ TimeKey: "time",
+ CallerKey: "caller",
+ NameKey: "name",
+ StacktraceKey: "stack",
+ EncodeLevel: zapcore.CapitalLevelEncoder,
+ EncodeTime: zapcore.ISO8601TimeEncoder,
+ EncodeCaller: zapcore.ShortCallerEncoder,
+ },
+ OutputPaths: []string{"stderr"},
+ ErrorOutputPaths: []string{"stderr"},
+ }
+
+ l, err := cfg.Build(zap.AddCaller())
+ if err != nil {
+ panic(err)
+ }
+
+ return l
+}
+
+func (s *TestServer) AssertContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) {
+ i := s.Client().GetWorkflowHistory(
+ context.Background(),
+ w.GetID(),
+ w.GetRunID(),
+ false,
+ enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
+ )
+
+ for {
+ if !i.HasNext() {
+ t.Error("no more events and no match found")
+ break
+ }
+
+ e, err := i.Next()
+ if err != nil {
+ t.Error("unable to read history event")
+ break
+ }
+
+ if assert(e) {
+ break
+ }
+ }
+}
+
+func (s *TestServer) AssertNotContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) {
+ i := s.Client().GetWorkflowHistory(
+ context.Background(),
+ w.GetID(),
+ w.GetRunID(),
+ false,
+ enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
+ )
+
+ for {
+ if !i.HasNext() {
+ break
+ }
+
+ e, err := i.Next()
+ if err != nil {
+ t.Error("unable to read history event")
+ break
+ }
+
+ if assert(e) {
+ t.Error("found unexpected event")
+ break
+ }
+ }
+}
diff --git a/tests/plugins/temporal/signal_test.go b/tests/plugins/temporal/signal_test.go
new file mode 100644
index 00000000..51826287
--- /dev/null
+++ b/tests/plugins/temporal/signal_test.go
@@ -0,0 +1,170 @@
+package tests
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/pborman/uuid"
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/enums/v1"
+ "go.temporal.io/api/history/v1"
+ "go.temporal.io/sdk/client"
+)
+
+func Test_SignalsWithoutSignals(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleSignalledWorkflow",
+ "Hello World",
+ )
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, 0, result)
+}
+
+func Test_SendSignalDuringTimer(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().SignalWithStartWorkflow(
+ context.Background(),
+ "signalled-"+uuid.New(),
+ "add",
+ 10,
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleSignalledWorkflow",
+ )
+ assert.NoError(t, err)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1)
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, 9, result)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
+ attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
+ return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
+ }
+
+ return false
+ })
+}
+
+func Test_SendSignalBeforeCompletingWorkflow(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "SimpleSignalledWorkflowWithSleep",
+ )
+ assert.NoError(t, err)
+
+ // should be around sleep(1) call
+ time.Sleep(time.Second + time.Millisecond*200)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1)
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, -1, result)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
+ attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
+ return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
+ }
+
+ return false
+ })
+}
+
+func Test_RuntimeSignal(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().SignalWithStartWorkflow(
+ context.Background(),
+ "signalled-"+uuid.New(),
+ "add",
+ -1,
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "RuntimeSignalWorkflow",
+ )
+ assert.NoError(t, err)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+ assert.Equal(t, -1, result)
+
+ s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
+ if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
+ attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
+ return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
+ }
+
+ return false
+ })
+}
+
+func Test_SignalSteps(t *testing.T) {
+ s := NewTestServer()
+ defer s.MustClose()
+
+ w, err := s.Client().ExecuteWorkflow(
+ context.Background(),
+ client.StartWorkflowOptions{
+ TaskQueue: "default",
+ },
+ "WorkflowWithSignalledSteps",
+ )
+ assert.NoError(t, err)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "begin", true)
+ assert.NoError(t, err)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next1", true)
+ assert.NoError(t, err)
+
+ v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil)
+ assert.NoError(t, err)
+
+ var r int
+ assert.NoError(t, v.Get(&r))
+ assert.Equal(t, 2, r)
+
+ err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next2", true)
+ assert.NoError(t, err)
+
+ v, err = s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil)
+ assert.NoError(t, err)
+
+ assert.NoError(t, v.Get(&r))
+ assert.Equal(t, 3, r)
+
+ var result int
+ assert.NoError(t, w.Get(context.Background(), &result))
+
+ // 3 ticks
+ assert.Equal(t, 3, result)
+}
diff --git a/tests/plugins/temporal/worker.php b/tests/plugins/temporal/worker.php
new file mode 100644
index 00000000..0d0263e7
--- /dev/null
+++ b/tests/plugins/temporal/worker.php
@@ -0,0 +1,33 @@
+<?php
+
+declare(strict_types=1);
+
+require __DIR__ . '/../../vendor/autoload.php';
+
+/**
+ * @param string $dir
+ * @return array<string>
+ */
+$getClasses = static function (string $dir): iterable {
+ $files = glob($dir . '/*.php');
+
+ foreach ($files as $file) {
+ yield substr(basename($file), 0, -4);
+ }
+};
+
+$factory = \Temporal\WorkerFactory::create();
+
+$worker = $factory->newWorker('default');
+
+// register all workflows
+foreach ($getClasses(__DIR__ . '/../../temporal/Workflow') as $name) {
+ $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name);
+}
+
+// register all activity
+foreach ($getClasses(__DIR__ . '/../../temporal/Activity') as $name) {
+ $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name);
+}
+
+$factory->run();
diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php
index d382098a..ef741a61 100644
--- a/tests/psr-worker-bench.php
+++ b/tests/psr-worker-bench.php
@@ -1,24 +1,58 @@
<?php
+declare(strict_types=1);
+
use Spiral\RoadRunner;
use Nyholm\Psr7\Factory;
ini_set('display_errors', 'stderr');
include "vendor/autoload.php";
-$worker = new RoadRunner\Http\PSR7Worker(
- RoadRunner\Worker::create(),
- new Factory\Psr17Factory(),
- new Factory\Psr17Factory(),
- new Factory\Psr17Factory()
-);
-
-while ($req = $worker->waitRequest()) {
- try {
- $rsp = new \Nyholm\Psr7\Response();
- $rsp->getBody()->write("hello world");
- $worker->respond($rsp);
- } catch (\Throwable $e) {
- $worker->getWorker()->error((string)$e);
+$env = \Spiral\RoadRunner\Environment::fromGlobals();
+
+if ($env->getMode() === 'http') {
+ $worker = new RoadRunner\Http\PSR7Worker(
+ RoadRunner\Worker::create(),
+ new Factory\Psr17Factory(),
+ new Factory\Psr17Factory(),
+ new Factory\Psr17Factory()
+ );
+
+ while ($req = $worker->waitRequest()) {
+ try {
+ $rsp = new \Nyholm\Psr7\Response();
+ $rsp->getBody()->write("hello world");
+ $worker->respond($rsp);
+ } catch (\Throwable $e) {
+ $worker->getWorker()->error((string)$e);
+ }
}
+} else {
+ /**
+ * @param string $dir
+ * @return array<string>
+ */
+ $getClasses = static function (string $dir): iterable {
+ $files = glob($dir . '/*.php');
+
+ foreach ($files as $file) {
+ yield substr(basename($file), 0, -4);
+ }
+ };
+
+ $factory = \Temporal\WorkerFactory::create();
+
+ $worker = $factory->newWorker('default');
+
+ // register all workflows
+ foreach ($getClasses(__DIR__ . '/../temporal/Workflow') as $name) {
+ $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name);
+ }
+
+ // register all activity
+ foreach ($getClasses(__DIR__ . '/../temporal/Activity') as $name) {
+ $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name);
+ }
+
+ $factory->run();
} \ No newline at end of file
diff --git a/tests/temporal/Activity/HeartBeatActivity.php b/tests/temporal/Activity/HeartBeatActivity.php
new file mode 100644
index 00000000..acf4a451
--- /dev/null
+++ b/tests/temporal/Activity/HeartBeatActivity.php
@@ -0,0 +1,58 @@
+<?php
+
+namespace Temporal\Tests\Activity;
+
+use Temporal\Activity;
+use Temporal\Activity\ActivityInterface;
+use Temporal\Activity\ActivityMethod;
+use Temporal\Roadrunner\Internal\Error;
+
+#[ActivityInterface(prefix: "HeartBeatActivity.")]
+class HeartBeatActivity
+{
+ #[ActivityMethod]
+ public function doSomething(
+ int $value
+ ): string {
+ Activity::heartbeat(['value' => $value]);
+ sleep($value);
+ return 'OK';
+ }
+
+ #[ActivityMethod]
+ public function slow(
+ string $value
+ ): string {
+ for ($i = 0; $i < 5; $i++) {
+ Activity::heartbeat(['value' => $i]);
+ sleep(1);
+ }
+
+ return 'OK';
+ }
+
+ #[ActivityMethod]
+ public function something(
+ string $value
+ ): string {
+ Activity::heartbeat(['value' => $value]);
+ sleep($value);
+ return 'OK';
+ }
+
+ #[ActivityMethod]
+ public function failedActivity(
+ int $value
+ ): string {
+ Activity::heartbeat(['value' => $value]);
+ if (Activity::getInfo()->attempt === 1) {
+ throw new \Error("failed");
+ }
+
+ if (!is_array(Activity::getHeartbeatDetails())) {
+ throw new \Error("no heartbeat details");
+ }
+
+ return 'OK!';
+ }
+} \ No newline at end of file
diff --git a/tests/temporal/Activity/SimpleActivity.php b/tests/temporal/Activity/SimpleActivity.php
new file mode 100644
index 00000000..576b126e
--- /dev/null
+++ b/tests/temporal/Activity/SimpleActivity.php
@@ -0,0 +1,63 @@
+<?php
+
+namespace Temporal\Tests\Activity;
+
+use Temporal\Activity\ActivityInterface;
+use Temporal\Activity\ActivityMethod;
+use Temporal\Api\Common\V1\WorkflowExecution;
+use Temporal\DataConverter\Bytes;
+use Temporal\Tests\DTO\Message;
+use Temporal\Tests\DTO\User;
+
+#[ActivityInterface(prefix: "SimpleActivity.")]
+class SimpleActivity
+{
+ #[ActivityMethod]
+ public function echo(
+ string $input
+ ): string {
+ return strtoupper($input);
+ }
+
+ #[ActivityMethod]
+ public function lower(
+ string $input
+ ): string {
+ return strtolower($input);
+ }
+
+ #[ActivityMethod]
+ public function greet(
+ User $user
+ ): Message {
+ return new Message(sprintf("Hello %s <%s>", $user->name, $user->email));
+ }
+
+ #[ActivityMethod]
+ public function slow(
+ string $input
+ ): string {
+ sleep(2);
+
+ return strtolower($input);
+ }
+
+ #[ActivityMethod]
+ public function sha512(
+ Bytes $input
+ ): string {
+ return hash("sha512", ($input->getData()));
+ }
+
+ public function updateRunID(WorkflowExecution $e): WorkflowExecution
+ {
+ $e->setRunId('updated');
+ return $e;
+ }
+
+ #[ActivityMethod]
+ public function fail()
+ {
+ throw new \Error("failed activity");
+ }
+} \ No newline at end of file
diff --git a/tests/temporal/Client/StartNewWorkflow.php b/tests/temporal/Client/StartNewWorkflow.php
new file mode 100644
index 00000000..67bc1d01
--- /dev/null
+++ b/tests/temporal/Client/StartNewWorkflow.php
@@ -0,0 +1,23 @@
+<?php
+
+
+namespace Temporal\Tests\Client;
+
+use Temporal\Client;
+use Temporal\Tests\Workflow\SimpleDTOWorkflow;
+
+use function Symfony\Component\String\s;
+
+class StartNewWorkflow
+{
+ private $stub;
+
+ public function __construct(Client\ClientInterface $client)
+ {
+ $this->stub = $client->newWorkflowStub(SimpleDTOWorkflow::class);
+ }
+
+ public function __invoke()
+ {
+ }
+}
diff --git a/tests/temporal/DTO/Message.php b/tests/temporal/DTO/Message.php
new file mode 100644
index 00000000..61703fe8
--- /dev/null
+++ b/tests/temporal/DTO/Message.php
@@ -0,0 +1,14 @@
+<?php
+
+
+namespace Temporal\Tests\DTO;
+
+class Message
+{
+ public string $message;
+
+ public function __construct(string $message)
+ {
+ $this->message = $message;
+ }
+} \ No newline at end of file
diff --git a/tests/temporal/DTO/User.php b/tests/temporal/DTO/User.php
new file mode 100644
index 00000000..cefea137
--- /dev/null
+++ b/tests/temporal/DTO/User.php
@@ -0,0 +1,15 @@
+<?php
+
+
+namespace Temporal\Tests\DTO;
+
+use Temporal\Internal\Marshaller\Meta\Marshal;
+
+class User
+{
+ #[Marshal(name: "Name")]
+ public string $name;
+
+ #[Marshal(name: "Email")]
+ public string $email;
+} \ No newline at end of file
diff --git a/tests/temporal/Workflow/ActivityStubWorkflow.php b/tests/temporal/Workflow/ActivityStubWorkflow.php
new file mode 100644
index 00000000..58dcdafb
--- /dev/null
+++ b/tests/temporal/Workflow/ActivityStubWorkflow.php
@@ -0,0 +1,39 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class ActivityStubWorkflow
+{
+ #[WorkflowMethod(name: 'ActivityStubWorkflow')]
+ public function handler(
+ string $input
+ ) {
+ // typed stub
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ $result = [];
+ $result[] = yield $simple->echo($input);
+
+ try {
+ $simple->undefined($input);
+ } catch (\BadMethodCallException $e) {
+ $result[] = 'invalid method call';
+ }
+
+ // untyped stub
+ $untyped = Workflow::newUntypedActivityStub(ActivityOptions::new()->withStartToCloseTimeout(1));
+
+ $result[] = yield $untyped->execute('SimpleActivity.echo', ['untyped']);
+
+ return $result;
+ }
+}
diff --git a/tests/temporal/Workflow/AggregatedWorkflow.php b/tests/temporal/Workflow/AggregatedWorkflow.php
new file mode 100644
index 00000000..3299179e
--- /dev/null
+++ b/tests/temporal/Workflow/AggregatedWorkflow.php
@@ -0,0 +1,30 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow;
+use Temporal\Workflow\SignalMethod;
+use Temporal\Workflow\WorkflowInterface;
+use Temporal\Workflow\WorkflowMethod;
+
+#[WorkflowInterface]
+class AggregatedWorkflow
+{
+ private array $values = [];
+
+ #[SignalMethod]
+ public function addValue(
+ string $value
+ ) {
+ $this->values[] = $value;
+ }
+
+ #[WorkflowMethod(name: 'AggregatedWorkflow')]
+ public function run(
+ int $count
+ ) {
+ yield Workflow::await(fn() => count($this->values) === $count);
+
+ return $this->values;
+ }
+}
diff --git a/tests/temporal/Workflow/AsyncActivityWorkflow.php b/tests/temporal/Workflow/AsyncActivityWorkflow.php
new file mode 100644
index 00000000..79e45dfb
--- /dev/null
+++ b/tests/temporal/Workflow/AsyncActivityWorkflow.php
@@ -0,0 +1,28 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityCancellationType;
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class AsyncActivityWorkflow
+{
+ #[WorkflowMethod(name: 'AsyncActivityWorkflow')]
+ public function handler()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()
+ ->withStartToCloseTimeout(20)
+ ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED)
+ );
+
+ return yield $simple->external();
+ }
+}
diff --git a/tests/temporal/Workflow/BinaryWorkflow.php b/tests/temporal/Workflow/BinaryWorkflow.php
new file mode 100644
index 00000000..ed1952ad
--- /dev/null
+++ b/tests/temporal/Workflow/BinaryWorkflow.php
@@ -0,0 +1,21 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\DataConverter\Bytes;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class BinaryWorkflow
+{
+ #[WorkflowMethod(name: 'BinaryWorkflow')]
+ public function handler(
+ Bytes $input
+ ): iterable {
+ $opts = ActivityOptions::new()->withStartToCloseTimeout(5);
+
+ return yield Workflow::executeActivity('SimpleActivity.sha512', [$input], $opts);
+ }
+}
diff --git a/tests/temporal/Workflow/CancelSignalledChildWorkflow.php b/tests/temporal/Workflow/CancelSignalledChildWorkflow.php
new file mode 100644
index 00000000..e2e43efa
--- /dev/null
+++ b/tests/temporal/Workflow/CancelSignalledChildWorkflow.php
@@ -0,0 +1,57 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use React\Promise\Deferred;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class CancelSignalledChildWorkflow
+{
+ private array $status = [];
+
+ #[Workflow\QueryMethod(name: 'getStatus')]
+ public function getStatus(): array
+ {
+ return $this->status;
+ }
+
+ #[WorkflowMethod(name: 'CancelSignalledChildWorkflow')]
+ public function handler()
+ {
+ // typed stub
+ $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class);
+
+ $waitSignalled = new Deferred();
+
+ $this->status[] = 'start';
+
+ // start execution
+ $scope = Workflow::newCancellationScope(
+ function () use ($simple, $waitSignalled) {
+ $call = $simple->handler();
+ $this->status[] = 'child started';
+
+ yield $simple->add(8);
+ $this->status[] = 'child signalled';
+ $waitSignalled->resolve();
+
+ return yield $call;
+ }
+ );
+
+ // only cancel scope when signal dispatched
+ yield $waitSignalled;
+ $scope->cancel();
+ $this->status[] = 'scope cancelled';
+
+ try {
+ return yield $scope;
+ } catch (\Throwable $e) {
+ $this->status[] = 'process done';
+
+ return 'cancelled ok';
+ }
+ }
+}
diff --git a/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php b/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php
new file mode 100644
index 00000000..6b463192
--- /dev/null
+++ b/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php
@@ -0,0 +1,29 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityCancellationType;
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\HeartBeatActivity;
+
+#[Workflow\WorkflowInterface]
+class CanceledHeartbeatWorkflow
+{
+ #[WorkflowMethod(name: 'CanceledHeartbeatWorkflow')]
+ public function handler(): iterable
+ {
+ $act = Workflow::newActivityStub(
+ HeartBeatActivity::class,
+ ActivityOptions::new()
+ ->withStartToCloseTimeout(50)
+ ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED)
+ ->withHeartbeatTimeout(1)
+ );
+
+ return yield $act->slow('test');
+ }
+}
diff --git a/tests/temporal/Workflow/CancelledMidflightWorkflow.php b/tests/temporal/Workflow/CancelledMidflightWorkflow.php
new file mode 100644
index 00000000..ea799ce1
--- /dev/null
+++ b/tests/temporal/Workflow/CancelledMidflightWorkflow.php
@@ -0,0 +1,47 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class CancelledMidflightWorkflow
+{
+ private array $status = [];
+
+ #[Workflow\QueryMethod(name: 'getStatus')]
+ public function getStatus(): array
+ {
+ return $this->status;
+ }
+
+ #[WorkflowMethod(name: 'CancelledMidflightWorkflow')]
+ public function handler()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ $this->status[] = 'start';
+
+ $scope = Workflow::newCancellationScope(
+ function () use ($simple) {
+ $this->status[] = 'in scope';
+ $simple->slow('1');
+ }
+ )->onCancel(
+ function () {
+ $this->status[] = 'on cancel';
+ }
+ );
+
+ $scope->cancel();
+ $this->status[] = 'done cancel';
+
+ return 'OK';
+ }
+}
diff --git a/tests/temporal/Workflow/CancelledNestedWorkflow.php b/tests/temporal/Workflow/CancelledNestedWorkflow.php
new file mode 100644
index 00000000..0c82f761
--- /dev/null
+++ b/tests/temporal/Workflow/CancelledNestedWorkflow.php
@@ -0,0 +1,72 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Exception\Failure\CanceledFailure;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class CancelledNestedWorkflow
+{
+ private array $status = [];
+
+ #[Workflow\QueryMethod(name: 'getStatus')]
+ public function getStatus(): array
+ {
+ return $this->status;
+ }
+
+ #[WorkflowMethod(name: 'CancelledNestedWorkflow')]
+ public function handler()
+ {
+ $this->status[] = 'begin';
+ try {
+ yield Workflow::newCancellationScope(
+ function () {
+ $this->status[] = 'first scope';
+
+ $scope = Workflow::newCancellationScope(
+ function () {
+ $this->status[] = 'second scope';
+
+ try {
+ yield Workflow::timer(2);
+ } catch (CanceledFailure $e) {
+ $this->status[] = 'second scope cancelled';
+ throw $e;
+ }
+
+ $this->status[] = 'second scope done';
+ }
+ )->onCancel(
+ function () {
+ $this->status[] = 'close second scope';
+ }
+ );
+
+ try {
+ yield Workflow::timer(1);
+ } catch (CanceledFailure $e) {
+ $this->status[] = 'first scope cancelled';
+ throw $e;
+ }
+
+ $this->status[] = 'first scope done';
+
+ yield $scope;
+ }
+ )->onCancel(
+ function () {
+ $this->status[] = 'close first scope';
+ }
+ );
+ } catch (CanceledFailure $e) {
+ $this->status[] = 'close process';
+
+ return 'CANCELLED';
+ }
+
+ return 'OK';
+ }
+}
diff --git a/tests/temporal/Workflow/CancelledScopeWorkflow.php b/tests/temporal/Workflow/CancelledScopeWorkflow.php
new file mode 100644
index 00000000..50e0992f
--- /dev/null
+++ b/tests/temporal/Workflow/CancelledScopeWorkflow.php
@@ -0,0 +1,39 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class CancelledScopeWorkflow
+{
+ #[WorkflowMethod(name: 'CancelledScopeWorkflow')]
+ public function handler()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ $cancelled = 'not';
+
+ $scope = Workflow::newCancellationScope(
+ function () use ($simple) {
+ yield Workflow::timer(2);
+ yield $simple->slow('hello');
+ }
+ )->onCancel(
+ function () use (&$cancelled) {
+ $cancelled = 'yes';
+ }
+ );
+
+ yield Workflow::timer(1);
+ $scope->cancel();
+
+ return $cancelled;
+ }
+}
diff --git a/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php b/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php
new file mode 100644
index 00000000..5fe8d3d8
--- /dev/null
+++ b/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php
@@ -0,0 +1,55 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Exception\Failure\CanceledFailure;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class CancelledSingleScopeWorkflow
+{
+ private array $status = [];
+
+ #[Workflow\QueryMethod(name: 'getStatus')]
+ public function getStatus(): array
+ {
+ return $this->status;
+ }
+
+ #[WorkflowMethod(name: 'CancelledSingleScopeWorkflow')]
+ public function handler()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()
+ ->withStartToCloseTimeout(5)
+ );
+
+ $this->status[] = 'start';
+ try {
+ yield Workflow::newCancellationScope(
+ function () use ($simple) {
+ try {
+ $this->status[] = 'in scope';
+ yield $simple->slow('1');
+ } catch (CanceledFailure $e) {
+ // after process is complete, do not use for business logic
+ $this->status[] = 'captured in scope';
+ throw $e;
+ }
+ }
+ )->onCancel(
+ function () {
+ $this->status[] = 'on cancel';
+ }
+ );
+ } catch (CanceledFailure $e) {
+ $this->status[] = 'captured in process';
+ }
+
+ return 'OK';
+ }
+}
diff --git a/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php b/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php
new file mode 100644
index 00000000..2074aac1
--- /dev/null
+++ b/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php
@@ -0,0 +1,79 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Exception\Failure\CanceledFailure;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class CancelledWithCompensationWorkflow
+{
+ private array $status = [];
+
+ #[Workflow\QueryMethod(name: 'getStatus')]
+ public function getStatus(): array
+ {
+ return $this->status;
+ }
+
+ #[WorkflowMethod(name: 'CancelledWithCompensationWorkflow')]
+ public function handler()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ // waits for 2 seconds
+ $slow = $simple->slow('DOING SLOW ACTIVITY');
+
+ try {
+ $this->status[] = 'yield';
+ $result = yield $slow;
+ } catch (CanceledFailure $e) {
+ $this->status[] = 'rollback';
+
+ try {
+ // must fail again
+ $result = yield $slow;
+ } catch (CanceledFailure $e) {
+ $this->status[] = 'captured retry';
+ }
+
+ try {
+ // fail since on cancelled context
+ $result = yield $simple->echo('echo must fail');
+ } catch (CanceledFailure $e) {
+ $this->status[] = 'captured promise on cancelled';
+ }
+
+ $scope = Workflow::newDetachedCancellationScope(
+ function () use ($simple) {
+ $this->status[] = 'START rollback';
+
+ $second = yield $simple->echo('rollback');
+
+ $this->status[] = sprintf("RESULT (%s)", $second);
+
+ if ($second !== 'ROLLBACK') {
+ $this->status[] = 'FAIL rollback';
+ return 'failed to compensate ' . $second;
+ }
+ $this->status[] = 'DONE rollback';
+
+ return 'OK';
+ }
+ );
+
+ $this->status[] = 'WAIT ROLLBACK';
+ $result = yield $scope;
+ $this->status[] = 'COMPLETE rollback';
+ }
+
+ $this->status[] = 'result: ' . $result;
+ return $result;
+ }
+}
diff --git a/tests/temporal/Workflow/CancelledWorkflow.php b/tests/temporal/Workflow/CancelledWorkflow.php
new file mode 100644
index 00000000..be9f7542
--- /dev/null
+++ b/tests/temporal/Workflow/CancelledWorkflow.php
@@ -0,0 +1,31 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Exception\Failure\CanceledFailure;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class CancelledWorkflow
+{
+ #[WorkflowMethod(name: 'CancelledWorkflow')]
+ public function handler()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ // waits for 2 seconds
+ $slow = $simple->slow('DOING SLOW ACTIVITY');
+
+ try {
+ return yield $slow;
+ } catch (CanceledFailure $e) {
+ return "CANCELLED";
+ }
+ }
+}
diff --git a/tests/temporal/Workflow/ChainedWorkflow.php b/tests/temporal/Workflow/ChainedWorkflow.php
new file mode 100644
index 00000000..ba9c8f96
--- /dev/null
+++ b/tests/temporal/Workflow/ChainedWorkflow.php
@@ -0,0 +1,31 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class ChainedWorkflow
+{
+ #[WorkflowMethod(name: 'ChainedWorkflow')]
+ public function handler(string $input): iterable
+ {
+ $opts = ActivityOptions::new()->withStartToCloseTimeout(5);
+
+ return yield Workflow::executeActivity(
+ 'SimpleActivity.echo',
+ [$input],
+ $opts
+ )->then(function ($result) use ($opts) {
+ return Workflow::executeActivity(
+ 'SimpleActivity.lower',
+ ['Result:' . $result],
+ $opts
+ );
+ });
+ }
+}
diff --git a/tests/temporal/Workflow/ChildStubWorkflow.php b/tests/temporal/Workflow/ChildStubWorkflow.php
new file mode 100644
index 00000000..608962c2
--- /dev/null
+++ b/tests/temporal/Workflow/ChildStubWorkflow.php
@@ -0,0 +1,30 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class ChildStubWorkflow
+{
+ #[WorkflowMethod(name: 'ChildStubWorkflow')]
+ public function handler(
+ string $input
+ ) {
+ // typed stub
+ $simple = Workflow::newChildWorkflowStub(SimpleWorkflow::class);
+
+ $result = [];
+ $result[] = yield $simple->handler($input);
+
+ // untyped
+ $untyped = Workflow::newUntypedChildWorkflowStub('SimpleWorkflow');
+ $result[] = yield $untyped->execute(['untyped']);
+
+ $execution = yield $untyped->getExecution();
+ assert($execution instanceof Workflow\WorkflowExecution);
+
+ return $result;
+ }
+}
diff --git a/tests/temporal/Workflow/ComplexExceptionalWorkflow.php b/tests/temporal/Workflow/ComplexExceptionalWorkflow.php
new file mode 100644
index 00000000..bf65ccb2
--- /dev/null
+++ b/tests/temporal/Workflow/ComplexExceptionalWorkflow.php
@@ -0,0 +1,26 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Common\RetryOptions;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class ComplexExceptionalWorkflow
+{
+ #[WorkflowMethod(name: 'ComplexExceptionalWorkflow')]
+ public function handler()
+ {
+ $child = Workflow::newChildWorkflowStub(
+ ExceptionalActivityWorkflow::class,
+ Workflow\ChildWorkflowOptions::new()->withRetryOptions(
+ (new RetryOptions())->withMaximumAttempts(1)
+ )
+ );
+
+ return yield $child->handler();
+ }
+}
diff --git a/tests/temporal/Workflow/ContinuableWorkflow.php b/tests/temporal/Workflow/ContinuableWorkflow.php
new file mode 100644
index 00000000..78411414
--- /dev/null
+++ b/tests/temporal/Workflow/ContinuableWorkflow.php
@@ -0,0 +1,38 @@
+<?php
+
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class ContinuableWorkflow
+{
+ #[WorkflowMethod(name: 'ContinuableWorkflow')]
+ public function handler(
+ int $generation
+ ) {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ if ($generation > 5) {
+ // complete
+ return "OK" . $generation;
+ }
+
+ if ($generation !== 1) {
+ assert(!empty(Workflow::getInfo()->continuedExecutionRunId));
+ }
+
+ for ($i = 0; $i < $generation; $i++) {
+ yield $simple->echo((string)$generation);
+ }
+
+ return Workflow::continueAsNew('ContinuableWorkflow', [++$generation]);
+ }
+}
diff --git a/tests/temporal/Workflow/EmptyWorkflow.php b/tests/temporal/Workflow/EmptyWorkflow.php
new file mode 100644
index 00000000..57fb5e65
--- /dev/null
+++ b/tests/temporal/Workflow/EmptyWorkflow.php
@@ -0,0 +1,16 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Workflow;
+
+#[Workflow\WorkflowInterface]
+class EmptyWorkflow
+{
+ #[WorkflowMethod]
+ public function handler()
+ {
+ return 42;
+ }
+}
diff --git a/tests/temporal/Workflow/ExceptionalActivityWorkflow.php b/tests/temporal/Workflow/ExceptionalActivityWorkflow.php
new file mode 100644
index 00000000..e0ed0005
--- /dev/null
+++ b/tests/temporal/Workflow/ExceptionalActivityWorkflow.php
@@ -0,0 +1,25 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Common\RetryOptions;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class ExceptionalActivityWorkflow
+{
+ #[WorkflowMethod(name: 'ExceptionalActivityWorkflow')]
+ public function handler()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ ->withRetryOptions((new RetryOptions())->withMaximumAttempts(1))
+ );
+
+ return yield $simple->fail();
+ }
+}
diff --git a/tests/temporal/Workflow/ExceptionalWorkflow.php b/tests/temporal/Workflow/ExceptionalWorkflow.php
new file mode 100644
index 00000000..9a3e907f
--- /dev/null
+++ b/tests/temporal/Workflow/ExceptionalWorkflow.php
@@ -0,0 +1,18 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class ExceptionalWorkflow
+{
+ #[WorkflowMethod(name: 'ExceptionalWorkflow')]
+ public function handler()
+ {
+ throw new \RuntimeException("workflow error");
+ }
+}
diff --git a/tests/temporal/Workflow/FailedHeartbeatWorkflow.php b/tests/temporal/Workflow/FailedHeartbeatWorkflow.php
new file mode 100644
index 00000000..e857f100
--- /dev/null
+++ b/tests/temporal/Workflow/FailedHeartbeatWorkflow.php
@@ -0,0 +1,30 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Common\RetryOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\HeartBeatActivity;
+
+#[Workflow\WorkflowInterface]
+class FailedHeartbeatWorkflow
+{
+ #[WorkflowMethod(name: 'FailedHeartbeatWorkflow')]
+ public function handler(
+ int $iterations
+ ): iterable {
+ $act = Workflow::newActivityStub(
+ HeartBeatActivity::class,
+ ActivityOptions::new()
+ ->withStartToCloseTimeout(50)
+ // will fail on first attempt
+ ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(2))
+ );
+
+ return yield $act->failedActivity($iterations);
+ }
+}
diff --git a/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php b/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php
new file mode 100644
index 00000000..c389fd78
--- /dev/null
+++ b/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php
@@ -0,0 +1,55 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\SignalMethod;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class LoopWithSignalCoroutinesWorkflow
+{
+ private array $values = [];
+ private array $result = [];
+ private $simple;
+
+ public function __construct()
+ {
+ $this->simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+ }
+
+ #[SignalMethod]
+ public function addValue(
+ string $value
+ ) {
+ $value = yield $this->simple->prefix('in signal ', $value);
+ $value = yield $this->simple->prefix('in signal 2 ', $value);
+
+ $this->values[] = $value;
+ }
+
+ #[WorkflowMethod(name: 'LoopWithSignalCoroutinesWorkflow')]
+ public function run(
+ int $count
+ ) {
+ while (true) {
+ yield Workflow::await(fn() => $this->values !== []);
+ $value = array_shift($this->values);
+
+ // uppercases
+ $this->result[] = yield $this->simple->echo($value);
+
+ if (count($this->result) === $count) {
+ break;
+ }
+ }
+
+ asort($this->result);
+ return array_values($this->result);
+ }
+}
diff --git a/tests/temporal/Workflow/LoopWorkflow.php b/tests/temporal/Workflow/LoopWorkflow.php
new file mode 100644
index 00000000..97d7a3aa
--- /dev/null
+++ b/tests/temporal/Workflow/LoopWorkflow.php
@@ -0,0 +1,51 @@
+<?php
+
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\SignalMethod;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class LoopWorkflow
+{
+ private array $values = [];
+ private array $result = [];
+ private $simple;
+
+ public function __construct()
+ {
+ $this->simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+ }
+
+ #[SignalMethod]
+ public function addValue(
+ string $value
+ ) {
+ $this->values[] = $value;
+ }
+
+ #[WorkflowMethod(name: 'LoopWorkflow')]
+ public function run(
+ int $count
+ ) {
+ while (true) {
+ yield Workflow::await(fn() => $this->values !== []);
+ $value = array_shift($this->values);
+
+ $this->result[] = yield $this->simple->echo($value);
+
+ if (count($this->result) === $count) {
+ break;
+ }
+ }
+
+ return $this->result;
+ }
+}
diff --git a/tests/temporal/Workflow/ParallelScopesWorkflow.php b/tests/temporal/Workflow/ParallelScopesWorkflow.php
new file mode 100644
index 00000000..8a2303f4
--- /dev/null
+++ b/tests/temporal/Workflow/ParallelScopesWorkflow.php
@@ -0,0 +1,36 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Promise;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class ParallelScopesWorkflow
+{
+ #[WorkflowMethod(name: 'ParallelScopesWorkflow')]
+ public function handler(string $input)
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ $a = Workflow::newCancellationScope(function () use ($simple, $input) {
+ return yield $simple->echo($input);
+ });
+
+ $b = Workflow::newCancellationScope(function () use ($simple, $input) {
+ return yield $simple->lower($input);
+ });
+
+ [$ra, $rb] = yield Promise::all([$a, $b]);
+
+ return sprintf('%s|%s|%s', $ra, $input, $rb);
+ }
+}
diff --git a/tests/temporal/Workflow/PeriodicWorkflow.php b/tests/temporal/Workflow/PeriodicWorkflow.php
new file mode 100644
index 00000000..08f5f2fa
--- /dev/null
+++ b/tests/temporal/Workflow/PeriodicWorkflow.php
@@ -0,0 +1,19 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class PeriodicWorkflow
+{
+ #[WorkflowMethod(name: 'PeriodicWorkflow')]
+ public function handler()
+ {
+ error_log("GOT SOMETHING" . print_r(Workflow::getLastCompletionResult(), true));
+
+ // todo: get last completion result
+ return 'OK';
+ }
+}
diff --git a/tests/temporal/Workflow/ProtoPayloadWorkflow.php b/tests/temporal/Workflow/ProtoPayloadWorkflow.php
new file mode 100644
index 00000000..7adbed1e
--- /dev/null
+++ b/tests/temporal/Workflow/ProtoPayloadWorkflow.php
@@ -0,0 +1,33 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Api\Common\V1\WorkflowExecution;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class ProtoPayloadWorkflow
+{
+ #[WorkflowMethod(name: 'ProtoPayloadWorkflow')]
+ public function handler(): iterable
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ $e = new WorkflowExecution();
+ $e->setWorkflowId('workflow id');
+ $e->setRunId('run id');
+
+ /** @var WorkflowExecution $e2 */
+ $e2 = yield $simple->updateRunID($e);
+ assert($e2->getWorkflowId() === $e->getWorkflowId());
+ assert($e2->getRunId() === 'updated');
+
+ return $e2;
+ }
+}
diff --git a/tests/temporal/Workflow/QueryWorkflow.php b/tests/temporal/Workflow/QueryWorkflow.php
new file mode 100644
index 00000000..96e41582
--- /dev/null
+++ b/tests/temporal/Workflow/QueryWorkflow.php
@@ -0,0 +1,41 @@
+<?php
+
+/**
+ * This file is part of Temporal package.
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class QueryWorkflow
+{
+ private int $counter = 0;
+
+ #[Workflow\SignalMethod(name: "add")]
+ public function add(
+ int $value
+ ) {
+ $this->counter += $value;
+ }
+
+ #[Workflow\QueryMethod(name: "get")]
+ public function get(): int
+ {
+ return $this->counter;
+ }
+
+ #[WorkflowMethod]
+ public function handler()
+ {
+ // collect signals during one second
+ yield Workflow::timer(1);
+
+ return $this->counter;
+ }
+}
diff --git a/tests/temporal/Workflow/RuntimeSignalWorkflow.php b/tests/temporal/Workflow/RuntimeSignalWorkflow.php
new file mode 100644
index 00000000..f700af72
--- /dev/null
+++ b/tests/temporal/Workflow/RuntimeSignalWorkflow.php
@@ -0,0 +1,31 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use React\Promise\Deferred;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class RuntimeSignalWorkflow
+{
+ #[WorkflowMethod]
+ public function handler()
+ {
+ $wait1 = new Deferred();
+ $wait2 = new Deferred();
+
+ $counter = 0;
+
+ Workflow::registerSignal('add', function ($value) use (&$counter, $wait1, $wait2) {
+ $counter += $value;
+ $wait1->resolve($value);
+ $wait2->resolve($value);
+ });
+
+ yield $wait1;
+ yield $wait2;
+
+ return $counter;
+ }
+}
diff --git a/tests/temporal/Workflow/SagaWorkflow.php b/tests/temporal/Workflow/SagaWorkflow.php
new file mode 100644
index 00000000..e47c0203
--- /dev/null
+++ b/tests/temporal/Workflow/SagaWorkflow.php
@@ -0,0 +1,54 @@
+<?php
+
+/**
+ * This file is part of Temporal package.
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Common\RetryOptions;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+
+#[Workflow\WorkflowInterface]
+class SagaWorkflow
+{
+ #[Workflow\WorkflowMethod(name: 'SagaWorkflow')]
+ public function run()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()
+ ->withStartToCloseTimeout(60)
+ ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(1))
+ );
+
+ $saga = new Workflow\Saga();
+ $saga->setParallelCompensation(true);
+
+ try {
+ yield $simple->echo('test');
+ $saga->addCompensation(
+ function () use ($simple) {
+ yield $simple->echo('compensate echo');
+ }
+ );
+
+ yield $simple->lower('TEST');
+ $saga->addCompensation(
+ function () use ($simple) {
+ yield $simple->lower('COMPENSATE LOWER');
+ }
+ );
+
+ yield $simple->fail();
+ } catch (\Throwable $e) {
+ yield $saga->compensate();
+ throw $e;
+ }
+ }
+}
diff --git a/tests/temporal/Workflow/SideEffectWorkflow.php b/tests/temporal/Workflow/SideEffectWorkflow.php
new file mode 100644
index 00000000..95d396e4
--- /dev/null
+++ b/tests/temporal/Workflow/SideEffectWorkflow.php
@@ -0,0 +1,30 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Common\Uuid;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class SideEffectWorkflow
+{
+ #[WorkflowMethod(name: 'SideEffectWorkflow')]
+ public function handler(string $input): iterable
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ $result = yield Workflow::sideEffect(
+ function () use ($input) {
+ return $input . '-42';
+ }
+ );
+
+ return yield $simple->lower($result);
+ }
+}
diff --git a/tests/temporal/Workflow/SignalChildViaStubWorkflow.php b/tests/temporal/Workflow/SignalChildViaStubWorkflow.php
new file mode 100644
index 00000000..828086fc
--- /dev/null
+++ b/tests/temporal/Workflow/SignalChildViaStubWorkflow.php
@@ -0,0 +1,25 @@
+<?php
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class SignalChildViaStubWorkflow
+{
+ #[WorkflowMethod(name: 'SignalChildViaStubWorkflow')]
+ public function handler()
+ {
+ // typed stub
+ $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class);
+
+ // start execution
+ $call = $simple->handler();
+
+ yield $simple->add(8);
+
+ // expects 8
+ return yield $call;
+ }
+}
diff --git a/tests/temporal/Workflow/SimpleDTOWorkflow.php b/tests/temporal/Workflow/SimpleDTOWorkflow.php
new file mode 100644
index 00000000..bd39a0a0
--- /dev/null
+++ b/tests/temporal/Workflow/SimpleDTOWorkflow.php
@@ -0,0 +1,35 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Tests\DTO\Message;
+use Temporal\Tests\DTO\User;
+
+#[Workflow\WorkflowInterface]
+class SimpleDTOWorkflow
+{
+ #[WorkflowMethod(name: 'SimpleDTOWorkflow')]//, returnType: Message::class)]
+ public function handler(
+ User $user
+ ) {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()
+ ->withStartToCloseTimeout(5)
+ );
+
+ $value = yield $simple->greet($user);
+
+ if (!$value instanceof Message) {
+ return "FAIL";
+ }
+
+ return $value;
+ }
+}
diff --git a/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php b/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php
new file mode 100644
index 00000000..c9999cd1
--- /dev/null
+++ b/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php
@@ -0,0 +1,25 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\HeartBeatActivity;
+
+#[Workflow\WorkflowInterface]
+class SimpleHeartbeatWorkflow
+{
+ #[WorkflowMethod(name: 'SimpleHeartbeatWorkflow')]
+ public function handler(int $iterations): iterable
+ {
+ $act = Workflow::newActivityStub(
+ HeartBeatActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(50)
+ );
+
+ return yield $act->doSomething($iterations);
+ }
+}
diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflow.php b/tests/temporal/Workflow/SimpleSignalledWorkflow.php
new file mode 100644
index 00000000..0df25a65
--- /dev/null
+++ b/tests/temporal/Workflow/SimpleSignalledWorkflow.php
@@ -0,0 +1,30 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class SimpleSignalledWorkflow
+{
+ private $counter = 0;
+
+ #[Workflow\SignalMethod(name: "add")]
+ public function add(
+ int $value
+ ) {
+ $this->counter += $value;
+ }
+
+ #[WorkflowMethod(name: 'SimpleSignalledWorkflow')]
+ public function handler(): iterable
+ {
+ // collect signals during one second
+ yield Workflow::timer(1);
+
+ return $this->counter;
+ }
+}
diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php b/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php
new file mode 100644
index 00000000..d10ba04a
--- /dev/null
+++ b/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php
@@ -0,0 +1,34 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class SimpleSignalledWorkflowWithSleep
+{
+ private $counter = 0;
+
+ #[Workflow\SignalMethod(name: "add")]
+ public function add(
+ int $value
+ ) {
+ $this->counter += $value;
+ }
+
+ #[WorkflowMethod(name: 'SimpleSignalledWorkflowWithSleep')]
+ public function handler(): iterable
+ {
+ // collect signals during one second
+ yield Workflow::timer(1);
+
+ if (!Workflow::isReplaying()) {
+ sleep(1);
+ }
+
+ return $this->counter;
+ }
+}
diff --git a/tests/temporal/Workflow/SimpleWorkflow.php b/tests/temporal/Workflow/SimpleWorkflow.php
new file mode 100644
index 00000000..36e12f69
--- /dev/null
+++ b/tests/temporal/Workflow/SimpleWorkflow.php
@@ -0,0 +1,31 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Common\RetryOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class SimpleWorkflow
+{
+ #[WorkflowMethod(name: 'SimpleWorkflow')]
+ public function handler(
+ string $input
+ ): iterable {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()
+ ->withStartToCloseTimeout(5)
+ ->withRetryOptions(
+ RetryOptions::new()->withMaximumAttempts(2)
+ )
+ );
+
+ return yield $simple->echo($input);
+ }
+}
diff --git a/tests/temporal/Workflow/TimerWorkflow.php b/tests/temporal/Workflow/TimerWorkflow.php
new file mode 100644
index 00000000..ab60d6c9
--- /dev/null
+++ b/tests/temporal/Workflow/TimerWorkflow.php
@@ -0,0 +1,27 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class TimerWorkflow
+{
+ #[WorkflowMethod(name: 'TimerWorkflow')]
+ public function handler(string $input): iterable
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ yield Workflow::timer(1);
+
+ return yield $simple->lower($input);
+ }
+}
diff --git a/tests/temporal/Workflow/WaitWorkflow.php b/tests/temporal/Workflow/WaitWorkflow.php
new file mode 100644
index 00000000..826952c1
--- /dev/null
+++ b/tests/temporal/Workflow/WaitWorkflow.php
@@ -0,0 +1,33 @@
+<?php
+
+
+namespace Temporal\Tests\Workflow;
+
+
+use Temporal\Workflow;
+use Temporal\Workflow\SignalMethod;
+use Temporal\Workflow\WorkflowInterface;
+use Temporal\Workflow\WorkflowMethod;
+
+#[WorkflowInterface]
+class WaitWorkflow
+{
+ private bool $ready = false;
+ private string $value;
+
+ #[SignalMethod]
+ public function unlock(
+ string $value
+ ) {
+ $this->ready = true;
+ $this->value = $value;
+ }
+
+ #[WorkflowMethod(name: 'WaitWorkflow')]
+ public function run()
+ {
+ yield Workflow::await(fn() => $this->ready);
+
+ return $this->value;
+ }
+}
diff --git a/tests/temporal/Workflow/WithChildStubWorkflow.php b/tests/temporal/Workflow/WithChildStubWorkflow.php
new file mode 100644
index 00000000..cdebe3d8
--- /dev/null
+++ b/tests/temporal/Workflow/WithChildStubWorkflow.php
@@ -0,0 +1,20 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class WithChildStubWorkflow
+{
+ #[WorkflowMethod(name: 'WithChildStubWorkflow')]
+ public function handler(string $input): iterable
+ {
+ $child = Workflow::newChildWorkflowStub(SimpleWorkflow::class);
+
+ return 'Child: ' . (yield $child->handler('child ' . $input));
+ }
+}
diff --git a/tests/temporal/Workflow/WithChildWorkflow.php b/tests/temporal/Workflow/WithChildWorkflow.php
new file mode 100644
index 00000000..aac0979b
--- /dev/null
+++ b/tests/temporal/Workflow/WithChildWorkflow.php
@@ -0,0 +1,25 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+
+#[Workflow\WorkflowInterface]
+class WithChildWorkflow
+{
+ #[WorkflowMethod(name: 'WithChildWorkflow')]
+ public function handler(
+ string $input
+ ): iterable {
+ $result = yield Workflow::executeChildWorkflow(
+ 'SimpleWorkflow',
+ ['child ' . $input],
+ Workflow\ChildWorkflowOptions::new()
+ );
+
+ return 'Child: ' . $result;
+ }
+}
diff --git a/tests/temporal/Workflow/WorkflowWithSequence.php b/tests/temporal/Workflow/WorkflowWithSequence.php
new file mode 100644
index 00000000..9e813a9c
--- /dev/null
+++ b/tests/temporal/Workflow/WorkflowWithSequence.php
@@ -0,0 +1,30 @@
+<?php
+
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class WorkflowWithSequence
+{
+ #[WorkflowMethod(name: 'WorkflowWithSequence')]
+ public function handler()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ $a = $simple->echo('a');
+ $b = $simple->echo('b');
+
+ yield $a;
+ yield $b;
+
+ return 'OK';
+ }
+}
diff --git a/tests/temporal/Workflow/WorkflowWithSignalledSteps.php b/tests/temporal/Workflow/WorkflowWithSignalledSteps.php
new file mode 100644
index 00000000..5f1af766
--- /dev/null
+++ b/tests/temporal/Workflow/WorkflowWithSignalledSteps.php
@@ -0,0 +1,51 @@
+<?php
+
+
+namespace Temporal\Tests\Workflow;
+
+use React\Promise\Deferred;
+use React\Promise\PromiseInterface;
+use Temporal\Activity\ActivityOptions;
+use Temporal\Workflow;
+use Temporal\Workflow\WorkflowMethod;
+use Temporal\Tests\Activity\SimpleActivity;
+
+#[Workflow\WorkflowInterface]
+class WorkflowWithSignalledSteps
+{
+ #[WorkflowMethod(name: 'WorkflowWithSignalledSteps')]
+ public function handler()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()->withStartToCloseTimeout(5)
+ );
+
+ $value = 0;
+ Workflow::registerQuery('value', function () use (&$value) {
+ return $value;
+ });
+
+ yield $this->promiseSignal('begin');
+ $value++;
+
+ yield $this->promiseSignal('next1');
+ $value++;
+
+ yield $this->promiseSignal('next2');
+ $value++;
+
+ return $value;
+ }
+
+ // is this correct?
+ private function promiseSignal(string $name): PromiseInterface
+ {
+ $signal = new Deferred();
+ Workflow::registerSignal($name, function ($value) use ($signal) {
+ $signal->resolve($value);
+ });
+
+ return $signal->promise();
+ }
+}