diff options
author | Valery Piashchynski <[email protected]> | 2021-01-26 11:52:03 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-01-26 11:52:03 +0300 |
commit | e2266b80db47444ba5858c736833a8a81b1361ad (patch) | |
tree | 37e06810352752f88032f7d0eadb554fa18b98da | |
parent | fae4711e3548bfd2e34f13aabfaab6a5b4e317c6 (diff) | |
parent | a392d962508e1bc9e497c8c4ef021425bc2c67c2 (diff) |
Merge pull request #502 from spiral/plugin/temporalv2.0.0-beta12
plugin(temporal): Add temporal plugins set to the RR2
89 files changed, 6111 insertions, 78 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 27df293a..076de74c 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -69,6 +69,9 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal.out -covermode=atomic ./tests/plugins/temporal + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_protocol.out -covermode=atomic ./plugins/temporal/protocol + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_workflow.out -covermode=atomic ./plugins/temporal/workflow go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 77f9cfda..00b51598 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -67,6 +67,8 @@ jobs: go test -v -race -tags=debug ./pkg/pool go test -v -race -tags=debug ./pkg/worker go test -v -race -tags=debug ./pkg/worker_watcher + go test -v -race -tags=debug ./plugins/temporal/protocol + go test -v -race -tags=debug ./plugins/temporal/workflow go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./tests/plugins/http go test -v -race -tags=debug ./tests/plugins/informer @@ -15,6 +15,14 @@ logs: mode: development level: error +# Workflow and activity mesh service +temporal: + address: localhost:7233 + activities: + num_workers: 4 + codec: json + debug_level: 2 + http: # host and port separated by semicolon address: 127.0.0.1:44933 @@ -32,6 +32,9 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pool.out -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal.out -covermode=atomic ./tests/plugins/temporal + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal_protocol.out -covermode=atomic ./plugins/temporal/protocol + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal_workflow.out -covermode=atomic ./plugins/temporal/workflow go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload @@ -58,62 +61,68 @@ test_coverage: test: ## Run application tests docker-compose -f tests/docker-compose.yaml up -d - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/http/config - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/informer - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/reload - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/server - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/checker - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/config - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/gzip - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/headers - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/logger - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/metrics - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/redis - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/resetter - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/rpc - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/static - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/boltdb - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memory - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memcached - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/boltdb - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memory - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memcached + go test -v -race -tags=debug ./pkg/transport/pipe + go test -v -race -tags=debug ./pkg/transport/socket + go test -v -race -tags=debug ./pkg/pool + go test -v -race -tags=debug ./pkg/worker + go test -v -race -tags=debug ./pkg/worker_watcher + go test -v -race -tags=debug ./tests/plugins/temporal + go test -v -race -tags=debug ./plugins/temporal/protocol + go test -v -race -tags=debug ./plugins/temporal/workflow + go test -v -race -tags=debug ./tests/plugins/http + go test -v -race -tags=debug ./plugins/http/config + go test -v -race -tags=debug ./tests/plugins/informer + go test -v -race -tags=debug ./tests/plugins/reload + go test -v -race -tags=debug ./tests/plugins/server + go test -v -race -tags=debug ./tests/plugins/checker + go test -v -race -tags=debug ./tests/plugins/config + go test -v -race -tags=debug ./tests/plugins/gzip + go test -v -race -tags=debug ./tests/plugins/headers + go test -v -race -tags=debug ./tests/plugins/logger + go test -v -race -tags=debug ./tests/plugins/metrics + go test -v -race -tags=debug ./tests/plugins/redis + go test -v -race -tags=debug ./tests/plugins/resetter + go test -v -race -tags=debug ./tests/plugins/rpc + go test -v -race -tags=debug ./tests/plugins/static + go test -v -race -tags=debug ./plugins/kv/boltdb + go test -v -race -tags=debug ./plugins/kv/memory + go test -v -race -tags=debug ./plugins/kv/memcached + go test -v -race -tags=debug ./tests/plugins/kv/boltdb + go test -v -race -tags=debug ./tests/plugins/kv/memory + go test -v -race -tags=debug ./tests/plugins/kv/memcached docker-compose -f tests/docker-compose.yaml down test_1.14: ## Run application tests docker-compose -f tests/docker-compose.yaml up -d - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/http/config - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/informer - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/reload - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/server - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/checker - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/config - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/gzip - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/headers - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/logger - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/metrics - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/redis - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/resetter - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/rpc - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/static - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/boltdb - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memory - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memcached - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/boltdb - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memory - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memcached + go1.14.14 test -v -race -tags=debug ./pkg/transport/pipe + go1.14.14 test -v -race -tags=debug ./pkg/transport/socket + go1.14.14 test -v -race -tags=debug ./pkg/pool + go1.14.14 test -v -race -tags=debug ./pkg/worker + go1.14.14 test -v -race -tags=debug ./pkg/worker_watcher + go1.14.14 test -v -race -tags=debug ./tests/plugins/temporal + go1.14.14 test -v -race -tags=debug ./plugins/temporal/protocol + go1.14.14 test -v -race -tags=debug ./plugins/temporal/workflow + go1.14.14 test -v -race -tags=debug ./tests/plugins/http + go1.14.14 test -v -race -tags=debug ./plugins/http/config + go1.14.14 test -v -race -tags=debug ./tests/plugins/informer + go1.14.14 test -v -race -tags=debug ./tests/plugins/reload + go1.14.14 test -v -race -tags=debug ./tests/plugins/server + go1.14.14 test -v -race -tags=debug ./tests/plugins/checker + go1.14.14 test -v -race -tags=debug ./tests/plugins/config + go1.14.14 test -v -race -tags=debug ./tests/plugins/gzip + go1.14.14 test -v -race -tags=debug ./tests/plugins/headers + go1.14.14 test -v -race -tags=debug ./tests/plugins/logger + go1.14.14 test -v -race -tags=debug ./tests/plugins/metrics + go1.14.14 test -v -race -tags=debug ./tests/plugins/redis + go1.14.14 test -v -race -tags=debug ./tests/plugins/resetter + go1.14.14 test -v -race -tags=debug ./tests/plugins/rpc + go1.14.14 test -v -race -tags=debug ./tests/plugins/static + go1.14.14 test -v -race -tags=debug ./plugins/kv/boltdb + go1.14.14 test -v -race -tags=debug ./plugins/kv/memory + go1.14.14 test -v -race -tags=debug ./plugins/kv/memcached + go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/boltdb + go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memory + go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memcached docker-compose -f tests/docker-compose.yaml down test_pipeline: test_1.14 test diff --git a/cmd/main.go b/cmd/main.go index 1dd19107..8074f316 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,6 +7,9 @@ import ( "github.com/spiral/roadrunner/v2/cmd/cli" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/temporal/activity" + temporalClient "github.com/spiral/roadrunner/v2/plugins/temporal/client" + "github.com/spiral/roadrunner/v2/plugins/temporal/workflow" "github.com/spiral/roadrunner/v2/plugins/kv/boltdb" "github.com/spiral/roadrunner/v2/plugins/kv/memcached" @@ -52,6 +55,11 @@ func main() { &memory.Plugin{}, // boltdb driver &boltdb.Plugin{}, + + // temporal plugins + &temporalClient.Plugin{}, + &activity.Plugin{}, + &workflow.Plugin{}, ) if err != nil { log.Fatal(err) @@ -8,16 +8,19 @@ require ( github.com/alicebob/miniredis/v2 v2.14.1 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 + github.com/cenkalti/backoff/v4 v4.1.0 github.com/dustin/go-humanize v1.0.0 github.com/fatih/color v1.10.0 github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-redis/redis/v8 v8.4.10 github.com/gofiber/fiber/v2 v2.3.3 github.com/golang/mock v1.4.4 + github.com/golang/protobuf v1.4.3 github.com/hashicorp/go-multierror v1.1.0 github.com/json-iterator/go v1.1.10 github.com/mattn/go-runewidth v0.0.10 github.com/olekukonko/tablewriter v0.0.4 + github.com/pborman/uuid v1.2.1 github.com/prometheus/client_golang v1.9.0 github.com/shirou/gopsutil v3.20.12+incompatible github.com/spf13/cobra v1.1.1 @@ -30,9 +33,12 @@ require ( github.com/vbauerster/mpb/v5 v5.4.0 github.com/yookoala/gofast v0.4.0 go.etcd.io/bbolt v1.3.5 + go.temporal.io/api v1.4.0 + go.temporal.io/sdk v1.4.0 go.uber.org/multierr v1.6.0 go.uber.org/zap v1.16.0 - golang.org/x/net v0.0.0-20201216054612-986b41b23924 + golang.org/x/net v0.0.0-20201224014010-6772e930b67b golang.org/x/sync v0.0.0-20201207232520-09787c993a3a - golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0 + golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 + google.golang.org/protobuf v1.25.0 ) @@ -74,6 +74,7 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -87,6 +88,7 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -102,8 +104,12 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw= +github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= @@ -123,20 +129,30 @@ github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgO github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY= github.com/go-redis/redis/v8 v8.4.10 h1:fWdl0RBmVibUDOp8bqz1e2Yy9dShOeIeWsiAifYk06Y= github.com/go-redis/redis/v8 v8.4.10/go.mod h1:d5yY/TlkQyYBSBHnXUmnf1OrHbyQere5JV4dLKwvXmo= github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gofiber/fiber/v2 v2.3.0/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0= github.com/gofiber/fiber/v2 v2.3.3 h1:nsjc9TfCl+ojXgEAu+uAT1Le7iQtZJ+Gfb/ox6+BM4w= github.com/gofiber/fiber/v2 v2.3.3/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0= +github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= +github.com/gogo/googleapis v1.4.0 h1:zgVt4UpGxcqVOw97aRGxT4svlcmdK35fynLNctY32zI= +github.com/gogo/googleapis v1.4.0/go.mod h1:5YRNX2z1oM5gXdAkurHa942MDgEJyk02w4OecKY87+c= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= +github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/gogo/status v1.1.0 h1:+eIkrewn5q6b30y+g/BJINVVdi2xH7je5MPJ3ZPK3JA= +github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -156,6 +172,7 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= @@ -167,6 +184,7 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -175,6 +193,9 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.4 h1:0ecGp3skIrHWPNGPJDaBIghfA6Sp7Ruo2Io8eLKzWm0= +github.com/google/uuid v1.1.4/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -238,6 +259,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.7 h1:7rix8v8GpI3ZBb0nSozFRgbtXKv+hOe+qfEpZqybrAg= github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= @@ -250,6 +272,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= @@ -297,6 +320,7 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= @@ -320,6 +344,8 @@ github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxSfWAKL3wpBW7V8scJMt8N8gnaMCS9E/cA= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= @@ -327,6 +353,8 @@ github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnh github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= +github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= +github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= @@ -383,6 +411,8 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -424,8 +454,12 @@ github.com/spf13/viper v1.7.0 h1:xVKxvI7ouOI5I+U9s2eeiUfMaWBVoXA3AWskkrqK0VM= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/spiral/endure v1.0.0-beta20/go.mod h1:qCU2/4gAItVESzUK0yPExmUTlTcpRLqJUgcV+nqxn+o= github.com/spiral/endure v1.0.0-beta21 h1:YW3gD6iNhRByG/yFkm/Ko+nj+oTBsjBtPVHFA2nt67k= github.com/spiral/endure v1.0.0-beta21/go.mod h1:GsItn+dYSO4O5uwvfki23xyxRnmBhxEyL6jBeJQoFPw= +github.com/spiral/endure v1.0.0-beta9 h1:PNiVit9DCucmhZLd4RgoiVL7Y5DBmweUwadFyulAct8= +github.com/spiral/endure v1.0.0-beta9/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ= +github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.7/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= @@ -438,9 +472,12 @@ github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3 github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= +github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= @@ -449,6 +486,10 @@ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/uber-go/tally v3.3.17+incompatible h1:nFHIuW3VQ22wItiE9kPXic8dEgExWOsVOHwpmoIvsMw= +github.com/uber-go/tally v3.3.17+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU= +github.com/uber/jaeger-client-go v2.23.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -464,6 +505,8 @@ github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6Ac github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yookoala/gofast v0.4.0 h1:dLBjghcsbbZNOEHN8N1X/gh9S6srmJed4WQfG7DlKwo= github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0= github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= @@ -477,14 +520,22 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= +go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opentelemetry.io/otel v0.16.0 h1:uIWEbdeb4vpKPGITLsRVUS44L5oDbDUCZxn8lkxhmgw= go.opentelemetry.io/otel v0.16.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= +go.temporal.io/api v1.4.0 h1:Ga1Ih8YE5ULs+UGt7u6Ppcf5SUMLyh4BATAs6SyPO0w= +go.temporal.io/api v1.4.0/go.mod h1:H0yXehwGE9Sn9zVruyy9aumq17SMsK1WmIy4GX3MIKw= +go.temporal.io/sdk v1.4.0 h1:IDIgfhakfgMv+zOMCQkXyqR7zHH+T4Nt2VAH5qW4w3w= +go.temporal.io/sdk v1.4.0/go.mod h1:hZ3Jd/Aoom1ao+fRyFf6y3MfHwXdPlhJiykX/gGmBeA= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= @@ -520,11 +571,15 @@ golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -544,12 +599,16 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201216054612-986b41b23924 h1:QsnDpLLOKwHBBDa8nDws4DYNc/ryVW2vCpxCs09d4PY= golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -560,6 +619,7 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -604,6 +664,8 @@ golang.org/x/sys v0.0.0-20201218084310-7d0127a74742 h1:+CBz4km/0KPU3RGTwARGh/noP golang.org/x/sys v0.0.0-20201218084310-7d0127a74742/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0 h1:n+DPcgTwkgWzIFpLmoimYR2K2b0Ga5+Os4kayIN0vGo= golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 h1:/dSxr6gT0FNI1MO5WLJo8mTmItROeOKTkDn+7OwWBos= +golang.org/x/sys v0.0.0-20210105210732-16f7687f5001/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -611,14 +673,19 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc= +golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 h1:Hir2P/De0WpUhtrKGGjvSb2YxUgyZ7EFOSLIcSSpiwE= +golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180726210403-bfb5194568d3/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= @@ -636,14 +703,21 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 h1:DnSr2mCsxyCE6ZgIkmcWUQY2R5cH/6wL7eIxEmQOMSE= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20200605181038-cef9fc3bc8f0 h1:gxU2P+MOOGAWge5BKP+BzqSeegxvDBRib5rk3yZDDuI= +golang.org/x/tools v0.0.0-20200605181038-cef9fc3bc8f0/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210115202250-e0d201561e39/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= @@ -656,6 +730,7 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= +google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -667,6 +742,11 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a h1:Ob5/580gVHBJZgXnff1cZDbG+xLtMVE5mDRTe+nIsX4= google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20210106152847-07624b53cd92 h1:jOTk2Z6KYaWoptUFqZ167cS8peoUPjFEXrsqfVkkCGc= +google.golang.org/genproto v0.0.0-20210106152847-07624b53cd92/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= @@ -677,20 +757,30 @@ google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.34.0 h1:raiipEjMOIC/TO2AvyTxP25XFdLxNIBwzDh3FM3XztI= +google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= @@ -714,6 +804,8 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210106172901-c476de37821d h1:827r06Ng1EGlK/5Qb/mj+yHDj6pgKf5CjoX4v24FRJ0= +gopkg.in/yaml.v3 v3.0.0-20210106172901-c476de37821d/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 4d64ac6d..3672f5ac 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -34,7 +34,7 @@ const ( PluginName = "http" // RR_HTTP env variable key (internal) if the HTTP presents - RR_HTTP = "RR_HTTP" //nolint:golint,stylecheck + RR_MODE = "RR_MODE" //nolint:golint,stylecheck // HTTPS_SCHEME HTTPS_SCHEME = "https" //nolint:golint,stylecheck @@ -101,7 +101,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se s.cfg.Env = make(map[string]string) } - s.cfg.Env[RR_HTTP] = "true" + s.cfg.Env[RR_MODE] = "http" s.pool, err = server.NewWorkerPool(context.Background(), pool.Config{ Debug: s.cfg.Pool.Debug, diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go new file mode 100644 index 00000000..d09722ce --- /dev/null +++ b/plugins/temporal/activity/activity_pool.go @@ -0,0 +1,197 @@ +package activity + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + rrWorker "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/client" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/worker" +) + +// RR_MODE env variable +const RR_MODE = "RR_MODE" //nolint:golint,stylecheck +// RR_CODEC env variable +const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck + +// +const doNotCompleteOnReturn = "doNotCompleteOnReturn" + +type activityPool interface { + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.SyncWorker + ActivityNames() []string + GetActivityContext(taskToken []byte) (context.Context, error) +} + +type activityPoolImpl struct { + dc converter.DataConverter + codec rrt.Codec + seqID uint64 + activities []string + wp pool.Pool + tWorkers []worker.Worker + running sync.Map +} + +// newActivityPool +func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) { + const op = errors.Op("new_activity_pool") + // env variables + env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()} + wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener) + if err != nil { + return nil, errors.E(op, err) + } + + return &activityPoolImpl{ + codec: codec, + wp: wp, + running: sync.Map{}, + }, nil +} + +// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. +func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("activity_pool_start") + pool.dc = temporal.GetDataConverter() + + err := pool.initWorkers(ctx, temporal) + if err != nil { + return errors.E(op, err) + } + + for i := 0; i < len(pool.tWorkers); i++ { + err := pool.tWorkers[i].Start() + if err != nil { + return errors.E(op, err) + } + } + + return nil +} + +// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. +func (pool *activityPoolImpl) Destroy(ctx context.Context) error { + for i := 0; i < len(pool.tWorkers); i++ { + pool.tWorkers[i].Stop() + } + + pool.wp.Destroy(ctx) + return nil +} + +// Workers returns list of all allocated workers. +func (pool *activityPoolImpl) Workers() []rrWorker.SyncWorker { + return pool.wp.Workers() +} + +// ActivityNames returns list of all available activity names. +func (pool *activityPoolImpl) ActivityNames() []string { + return pool.activities +} + +// ActivityNames returns list of all available activity names. +func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) { + const op = errors.Op("activity_pool_get_activity_context") + c, ok := pool.running.Load(string(taskToken)) + if !ok { + return nil, errors.E(op, errors.Str("heartbeat on non running activity")) + } + + return c.(context.Context), nil +} + +// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. +func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("activity_pool_create_temporal_worker") + + workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter()) + if err != nil { + return errors.E(op, err) + } + + pool.activities = make([]string, 0) + pool.tWorkers = make([]worker.Worker, 0) + + for i := 0; i < len(workerInfo); i++ { + w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options) + if err != nil { + return errors.E(op, err, pool.Destroy(ctx)) + } + + pool.tWorkers = append(pool.tWorkers, w) + for j := 0; j < len(workerInfo[i].Activities); j++ { + w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{ + Name: workerInfo[i].Activities[j].Name, + DisableAlreadyRegisteredCheck: false, + }) + + pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name) + } + } + + return nil +} + +// executes activity with underlying worker. +func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) { + const op = errors.Op("activity_pool_execute_activity") + + heartbeatDetails := &common.Payloads{} + if activity.HasHeartbeatDetails(ctx) { + err := activity.GetHeartbeatDetails(ctx, &heartbeatDetails) + if err != nil { + return nil, errors.E(op, err) + } + } + + var info = activity.GetInfo(ctx) + var msg = rrt.Message{ + ID: atomic.AddUint64(&pool.seqID, 1), + Command: rrt.InvokeActivity{ + Name: info.ActivityType.Name, + Info: info, + HeartbeatDetails: len(heartbeatDetails.Payloads), + }, + Payloads: args, + } + + if len(heartbeatDetails.Payloads) != 0 { + msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...) + } + + pool.running.Store(string(info.TaskToken), ctx) + defer pool.running.Delete(string(info.TaskToken)) + + result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg) + if err != nil { + return nil, errors.E(op, err) + } + + if len(result) != 1 { + return nil, errors.E(op, errors.Str("invalid activity worker response")) + } + + out := result[0] + if out.Failure != nil { + if out.Failure.Message == doNotCompleteOnReturn { + return nil, activity.ErrResultPending + } + + return nil, internalbindings.ConvertFailureToError(out.Failure, pool.dc) + } + + return out.Payloads, nil +} diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go new file mode 100644 index 00000000..5e562a8d --- /dev/null +++ b/plugins/temporal/activity/plugin.go @@ -0,0 +1,215 @@ +package activity + +import ( + "context" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" + + "sync" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/client" +) + +const ( + // PluginName defines public service name. + PluginName = "activities" + + // RRMode sets as RR_MODE env variable to let worker know about the mode to run. + RRMode = "temporal/activity" +) + +// Plugin to manage activity execution. +type Plugin struct { + temporal client.Temporal + events events.Handler + server server.Server + log logger.Logger + mu sync.Mutex + reset chan struct{} + pool activityPool + closing int64 +} + +// Init configures activity service. +func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { + const op = errors.Op("activity_plugin_init") + if temporal.GetConfig().Activities == nil { + // no need to serve activities + return errors.E(op, errors.Disabled) + } + + p.temporal = temporal + p.server = server + p.events = events.NewEventsHandler() + p.log = log + p.reset = make(chan struct{}) + + return nil +} + +// Serve activities with underlying workers. +func (p *Plugin) Serve() chan error { + const op = errors.Op("activity_plugin_serve") + + errCh := make(chan error, 1) + pool, err := p.startPool() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + p.pool = pool + + go func() { + for { + select { + case <-p.reset: + if atomic.LoadInt64(&p.closing) == 1 { + return + } + + err := p.replacePool() + if err == nil { + continue + } + + bkoff := backoff.NewExponentialBackOff() + bkoff.InitialInterval = time.Second + + err = backoff.Retry(p.replacePool, bkoff) + if err != nil { + errCh <- errors.E(op, err) + } + } + } + }() + + return errCh +} + +// Stop stops the serving plugin. +func (p *Plugin) Stop() error { + atomic.StoreInt64(&p.closing, 1) + const op = errors.Op("activity_plugin_stop") + + pool := p.getPool() + if pool != nil { + p.pool = nil + err := pool.Destroy(context.Background()) + if err != nil { + return errors.E(op, err) + } + return nil + } + + return nil +} + +// Name of the service. +func (p *Plugin) Name() string { + return PluginName +} + +// RPC returns associated rpc service. +func (p *Plugin) RPC() interface{} { + return &rpc{srv: p, client: p.temporal.GetClient()} +} + +// Workers returns pool workers. +func (p *Plugin) Workers() []worker.SyncWorker { + return p.getPool().Workers() +} + +// ActivityNames returns list of all available activities. +func (p *Plugin) ActivityNames() []string { + return p.pool.ActivityNames() +} + +// Reset resets underlying workflow pool with new copy. +func (p *Plugin) Reset() error { + p.reset <- struct{}{} + + return nil +} + +// AddListener adds event listeners to the service. +func (p *Plugin) AddListener(listener events.Listener) { + p.events.AddListener(listener) +} + +// AddListener adds event listeners to the service. +func (p *Plugin) poolListener(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventPoolError { + p.log.Error("Activity pool error", "error", ev.Payload.(error)) + p.reset <- struct{}{} + } + } + + p.events.Push(event) +} + +// AddListener adds event listeners to the service. +func (p *Plugin) startPool() (activityPool, error) { + pool, err := newActivityPool( + p.temporal.GetCodec().WithLogger(p.log), + p.poolListener, + *p.temporal.GetConfig().Activities, + p.server, + ) + + if err != nil { + return nil, errors.E(errors.Op("newActivityPool"), err) + } + + err = pool.Start(context.Background(), p.temporal) + if err != nil { + return nil, errors.E(errors.Op("startActivityPool"), err) + } + + p.log.Debug("Started activity processing", "activities", pool.ActivityNames()) + + return pool, nil +} + +func (p *Plugin) replacePool() error { + pool, err := p.startPool() + if err != nil { + p.log.Error("Replace activity pool failed", "error", err) + return errors.E(errors.Op("newActivityPool"), err) + } + + p.log.Debug("Replace activity pool") + + var previous activityPool + + p.mu.Lock() + previous, p.pool = p.pool, pool + p.mu.Unlock() + + errD := previous.Destroy(context.Background()) + if errD != nil { + p.log.Error( + "Unable to destroy expired activity pool", + "error", + errors.E(errors.Op("destroyActivityPool"), errD), + ) + } + + return nil +} + +// getPool returns currently pool. +func (p *Plugin) getPool() activityPool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.pool +} diff --git a/plugins/temporal/activity/rpc.go b/plugins/temporal/activity/rpc.go new file mode 100644 index 00000000..49efcd4f --- /dev/null +++ b/plugins/temporal/activity/rpc.go @@ -0,0 +1,66 @@ +package activity + +import ( + v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + "google.golang.org/protobuf/proto" +) + +/* +- the method's type is exported. +- the method is exported. +- the method has two arguments, both exported (or builtin) types. +- the method's second argument is a pointer. +- the method has return type error. +*/ +type rpc struct { + srv *Plugin + client client.Client +} + +// RecordHeartbeatRequest sent by activity to record current state. +type RecordHeartbeatRequest struct { + TaskToken []byte `json:"taskToken"` + Details []byte `json:"details"` +} + +// RecordHeartbeatResponse sent back to the worker to indicate that activity was cancelled. +type RecordHeartbeatResponse struct { + Canceled bool `json:"canceled"` +} + +// RecordActivityHeartbeat records heartbeat for an activity. +// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity. +// details - is the progress you want to record along with heart beat for this activity. +// The errors it can return: +// - EntityNotExistsError +// - InternalServiceError +// - CanceledError +func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHeartbeatResponse) error { + details := &commonpb.Payloads{} + + if len(in.Details) != 0 { + if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil { + return err + } + } + + // find running activity + ctx, err := r.srv.getPool().GetActivityContext(in.TaskToken) + if err != nil { + return err + } + + activity.RecordHeartbeat(ctx, details) + + select { + case <-ctx.Done(): + *out = RecordHeartbeatResponse{Canceled: true} + default: + *out = RecordHeartbeatResponse{Canceled: false} + } + + return nil +} diff --git a/plugins/temporal/client/doc/doc.go b/plugins/temporal/client/doc/doc.go new file mode 100644 index 00000000..10257070 --- /dev/null +++ b/plugins/temporal/client/doc/doc.go @@ -0,0 +1 @@ +package doc diff --git a/plugins/temporal/client/doc/temporal.drawio b/plugins/temporal/client/doc/temporal.drawio new file mode 100644 index 00000000..f2350af8 --- /dev/null +++ b/plugins/temporal/client/doc/temporal.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2020-10-20T11:17:09.390Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="U-Sm3BmbD4KaqDkjKEOx" version="13.7.9" type="device"><diagram name="Page-1" id="6133507b-19e7-1e82-6fc7-422aa6c4b21f">7V1tc6M2EP41nsl1Jh5eDIaPsZPcdXptc83N5dpvspFtNRhRkGP7fn0lkDAgSEgCxrkoH2K0SEJon919VrwNzOl69zEC4ep37EF/YGjebmBeDgxD17UR/WGSPZdohpVKlhHyuOwguEU/oKjIpRvkwbhQkWDsExQWhXMcBHBOCjIQRXhbrLbAfvGoIVhCSXA7B74svUMeWaVSwzTtw45PEC1X4tC2xU95Bub3ywhvAn7AgWEukr909xqIzviZxivg4W1OZF4NzGmEMUm31rsp9NnsinlL213X7M0GHsGANGkwc0ezuW46c+A6GlycO3xYZC/mAnp0angRR2SFlzgA/tVBOklOF7IONVpakbVPN3W66YMZ9CfZjEyxjyO6K8ABaxYTEJELpq2S7Br5Pu8sKXN8OLQMA6/Ugkpy9WmJ19ZtWlzggFyDNfIZEL/ByAMB4GJRjc74RJ40Po8x3kRzPg9jx7V13XIXmg1dBzrnI5MjE0RLSGoq6eO0EpvFXNdcGR8hXkMS7WmFCPqAoIciAAHH8TKrx5vSSQD7XIUQo4DEuZ5vmIBWyGxyzAHHTdLR3CIuntmAbqRjEKXcyRxECdiaAU/nHuIB+Bs+CdpwqEtwJHBHiiiLYIx+gFlSgWGAzwWtbU0G1iWVAB8tAyqYU/1CisDJA4wIouZ+wXeskeclUK5B7MGKZfBUYkwCVI2dsYHAXUHn3KfxEyq4hQJgeKtzbWiNR1ZBV+cO91PPA5UEAqcIgXGpA7xYxBT4RVS8Gge2hAMFgqdAoA0dV3gagQG9FQhkdpl16x4BBO6biUIeAmsceF9XKOg1GDlGg2AkIlZ7waig8+dYuStb+XD4y89q6OJs2/D2zsjuxNKFSvaV7bvx9Zqk8FO1cxzCN2DgrnUqBi7ymyKd+2kNXCC5FQunarXaMWm95Ci0Ixi1GOZ7IHDtqV0b2q5b6YFfiQGjBAHjGH5dQsDAsH2m8LAAA/u/DVvnmKypT2P86YLu1cId/Z+4SS2VnxMcpvtGuX0MQeccEmwfR0XWJ91a8t/kyCgnAOuQoUQu/cr6WADqhIW42KTQI50VVD7KLJIkQvAVrkMcAT/XfFauTGVhWfaiKfPhguTmrG5S5OFWjWAVsSGIRTrRl17fbTdDZnU/QjL1EVX12QdaOpsn28NUNDCmLPTRgB19qDpB0R4HC7RM2qebNXW5yoNZzH6mEQQE3uHoHkZnBMT3XzaQgpudZ4SCZXpsHBKEg5hubZOKwz9TQTJWLkq7qBprcepLDrPs6xKbyFxiOncTTGst/ISvLBKW8ribm1K+gdhYtD/gtuSNCyQrW6RkhTleoznffopyRfg+W0DV+eFz3pj9XV8/Rokkx1rrQEduabGCF7eH9VtLrGes8ku35aD4EvZjmJZl2WPHAI4JbU+r8IBTH8RxANZQUm28RWsfJDOW00CiOV5JbxL9UkTMV8j3PoM93rDzo4R2fi9KkxWO0A/aLRDHyPNdwy7UuGUtuZaT0AxvhHayaA0/g5hkqPB9EMZolg04NesJJgSvBY74mTIKXRWUu0Jd7nC8UiOq/npQ2qMiKHVNXCfIwVK3q2BpOx3A0qgIzMwtLhBkV0iY8yX7UIaoTNfSWS7PqjzRJQ9VDdo4BHPqRD8ndS5HB8lffEKYaLtCBN6GIMmQthEIiw5vRclfkqJFmABSSyQndK6n2tBilJL6YIuFMVFOWCaN0SxE0PMDKNE7pCDfwvjglfOIqDX8pyHCIWHYzQAh6rWKB1Ph4dTwYBk94kG+BqPw0DMexk6PeJCvxUiq91GZuTQKwE/oPUvluaK/MhxcnusSGEwZDGaFkhPKcINjxPg4lUVp3YpVhmPrN7tZ4ikFd8EHxjX2TnteYe+M2foHZfa9wEI/YhyovWtB5S8qfykk1ZacVVenL3oL7kqCpaH4asuOqt7yTzKBkRExUog4OUQcM4WRhyvfWaA4a9sabpqUdBIEFGk9Wct3+owFI5kdKNL6DkmrbozLrLXxovuoC1wqjtK1p8pM/22w1pFaZz09RPTKWq2qW2YUa21Xw32yVoEuxVpPz/KPyVqluyUVLE4VFkddgi/jom6lU+Gif1yYR7wkK43Xlte3VJb7DrNcxy4muc1vLXONDmApUm6V0nTmqTLLfxtJ7rhqnValNO1quGnS2kVKI55IU5HofUciab2VhiKzGSz1URexyFELrl3Hosz230YscqouCitE9IuIXhdc3Sq+qthJuxruc8HVrQsCagWld8vvc8FVweJkYdHvPc+ufFFWpTPvMZ2x9JenMy28SUUGZtVTOIq8tumrMtt/G+mMrslLLwoSfUOi39uetaorhiqhaVfFfSY02YOCirqenu33mdHU3UyoYNE7LHp+ilNzJGiolOY9pjSa+9KbBfTy+35bQqZakO/86R2t7vXSJ5rU6OoWkhPERL9ZTcXb4FVW07qOe01rKt41ofjrqVj/MfOai5vb8fe7y4urL1eju+Cf6NO33+yKFylezAl6QGR/C6MH9vLKehr7FrhrKvod7AoVn8toH32tZxsE0ijxR4GBHC6yu4AK9NFswWdUAqPBCujhjbsaOrzYt9p9HN9nHNk/PAqRoqVIeHnUMl/DLisx05kvafDUZfp23mswJzgZusJN/7ipYKDHxU2D628iKN1grNzNacCmgtQeFzbNL8j8vPnKM4lJO5qveqVipeq74ibtXKh53z6iM3BULMkf1y+8/hObhzqfcWLmzMr/hYQmRWkmAjYEF3NiuEPke277b9YV1VNauhTpR1LYi0JAT/d7vpC2MixRPrRLSqLhy9KR/OePHqP8+S8g1Xx5pPPvH9Hi4Rut6RdTDp/CNa/+Bw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go new file mode 100644 index 00000000..047a1815 --- /dev/null +++ b/plugins/temporal/client/plugin.go @@ -0,0 +1,169 @@ +package client + +import ( + "fmt" + "os" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/worker" +) + +// PluginName defines public service name. +const PluginName = "temporal" + +// indicates that the case size was set +var stickyCacheSet = false + +// Plugin implement Temporal contract. +type Plugin struct { + workerID int32 + cfg *Config + dc converter.DataConverter + log logger.Logger + client client.Client +} + +// Temporal define common interface for RoadRunner plugins. +type Temporal interface { + GetClient() client.Client + GetDataConverter() converter.DataConverter + GetConfig() Config + GetCodec() rrt.Codec + CreateWorker(taskQueue string, options worker.Options) (worker.Worker, error) +} + +// Config of the temporal client and depended services. +type Config struct { + Address string + Namespace string + Activities *pool.Config + Codec string + DebugLevel int `mapstructure:"debug_level"` + CacheSize int `mapstructure:"cache_size"` +} + +// Init initiates temporal client plugin. +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("temporal_client_plugin_init") + p.log = log + p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter()) + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, err) + } + if p.cfg == nil { + return errors.E(op, errors.Disabled) + } + + return nil +} + +// GetConfig returns temporal configuration. +func (p *Plugin) GetConfig() Config { + if p.cfg != nil { + return *p.cfg + } + // empty + return Config{} +} + +// GetCodec returns communication codec. +func (p *Plugin) GetCodec() rrt.Codec { + if p.cfg.Codec == "json" { + return rrt.NewJSONCodec(rrt.DebugLevel(p.cfg.DebugLevel), p.log) + } + + // production ready protocol, no debug abilities + return rrt.NewProtoCodec() +} + +// GetDataConverter returns data active data converter. +func (p *Plugin) GetDataConverter() converter.DataConverter { + return p.dc +} + +// Serve starts temporal srv. +func (p *Plugin) Serve() chan error { + const op = errors.Op("temporal_client_plugin_serve") + errCh := make(chan error, 1) + var err error + + if stickyCacheSet == false && p.cfg.CacheSize != 0 { + worker.SetStickyWorkflowCacheSize(p.cfg.CacheSize) + stickyCacheSet = true + } + + p.client, err = client.NewClient(client.Options{ + Logger: p.log, + HostPort: p.cfg.Address, + Namespace: p.cfg.Namespace, + DataConverter: p.dc, + }) + + if err != nil { + errCh <- errors.E(op, err) + } + + p.log.Debug("connected to temporal server", "address", p.cfg.Address) + + return errCh +} + +// Stop stops temporal srv connection. +func (p *Plugin) Stop() error { + if p.client != nil { + p.client.Close() + } + + return nil +} + +// GetClient returns active srv connection. +func (p *Plugin) GetClient() client.Client { + return p.client +} + +// CreateWorker allocates new temporal worker on an active connection. +func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) { + const op = errors.Op("temporal_client_plugin_create_worker") + if p.client == nil { + return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client")) + } + + if options.Identity == "" { + if tq == "" { + tq = client.DefaultNamespace + } + + // ensures unique worker IDs + options.Identity = fmt.Sprintf( + "%d@%s@%s@%v", + os.Getpid(), + getHostName(), + tq, + atomic.AddInt32(&p.workerID, 1), + ) + } + + return worker.New(p.client, tq, options), nil +} + +// Name of the service. +func (p *Plugin) Name() string { + return PluginName +} + +func getHostName() string { + hostName, err := os.Hostname() + if err != nil { + hostName = "Unknown" + } + return hostName +} diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go new file mode 100644 index 00000000..406e70f4 --- /dev/null +++ b/plugins/temporal/protocol/converter.go @@ -0,0 +1,76 @@ +package protocol + +import ( + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" +) + +type ( + // DataConverter wraps Temporal data converter to enable direct access to the payloads. + DataConverter struct { + fallback converter.DataConverter + } +) + +// NewDataConverter creates new data converter. +func NewDataConverter(fallback converter.DataConverter) converter.DataConverter { + return &DataConverter{fallback: fallback} +} + +// ToPayloads converts a list of values. +func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) { + for _, v := range values { + if aggregated, ok := v.(*commonpb.Payloads); ok { + // bypassing + return aggregated, nil + } + } + + return r.fallback.ToPayloads(values...) +} + +// ToPayload converts single value to payload. +func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { + return r.fallback.ToPayload(value) +} + +// FromPayloads converts to a list of values of different types. +// Useful for deserializing arguments of function invocations. +func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { + if payloads == nil { + return nil + } + + if len(valuePtrs) == 1 { + // input proxying + if input, ok := valuePtrs[0].(**commonpb.Payloads); ok { + *input = &commonpb.Payloads{} + (*input).Payloads = payloads.Payloads + return nil + } + } + + for i := 0; i < len(payloads.Payloads); i++ { + err := r.FromPayload(payloads.Payloads[i], valuePtrs[i]) + if err != nil { + return err + } + } + + return nil +} + +// FromPayload converts single value from payload. +func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { + return r.fallback.FromPayload(payload, valuePtr) +} + +// ToString converts payload object into human readable string. +func (r *DataConverter) ToString(input *commonpb.Payload) string { + return r.fallback.ToString(input) +} + +// ToStrings converts payloads object into human readable strings. +func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string { + return r.fallback.ToStrings(input) +} diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go new file mode 100644 index 00000000..6ce9fa0f --- /dev/null +++ b/plugins/temporal/protocol/converter_test.go @@ -0,0 +1,23 @@ +package protocol + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" +) + +func Test_Passthough(t *testing.T) { + codec := NewDataConverter(converter.GetDefaultDataConverter()) + + value, err := codec.ToPayloads("test") + assert.NoError(t, err) + + out := &common.Payloads{} + + assert.Len(t, out.Payloads, 0) + assert.NoError(t, codec.FromPayloads(value, &out)) + + assert.Len(t, out.Payloads, 1) +} diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go new file mode 100644 index 00000000..c554e28f --- /dev/null +++ b/plugins/temporal/protocol/internal/protocol.pb.go @@ -0,0 +1,167 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: protocol.proto + +package internal + +import ( + fmt "fmt" + math "math" + + proto "github.com/golang/protobuf/proto" + v11 "go.temporal.io/api/common/v1" + v1 "go.temporal.io/api/failure/v1" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Frame struct { + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Frame) Reset() { *m = Frame{} } +func (m *Frame) String() string { return proto.CompactTextString(m) } +func (*Frame) ProtoMessage() {} +func (*Frame) Descriptor() ([]byte, []int) { + return fileDescriptor_2bc2336598a3f7e0, []int{0} +} + +func (m *Frame) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Frame.Unmarshal(m, b) +} +func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Frame.Marshal(b, m, deterministic) +} +func (m *Frame) XXX_Merge(src proto.Message) { + xxx_messageInfo_Frame.Merge(m, src) +} +func (m *Frame) XXX_Size() int { + return xxx_messageInfo_Frame.Size(m) +} +func (m *Frame) XXX_DiscardUnknown() { + xxx_messageInfo_Frame.DiscardUnknown(m) +} + +var xxx_messageInfo_Frame proto.InternalMessageInfo + +func (m *Frame) GetMessages() []*Message { + if m != nil { + return m.Messages + } + return nil +} + +// Single communication message. +type Message struct { + Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + // command name (if any) + Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` + // command options in json format. + Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` + // error response. + Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"` + // invocation or result payloads. + Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_2bc2336598a3f7e0, []int{1} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetId() uint64 { + if m != nil { + return m.Id + } + return 0 +} + +func (m *Message) GetCommand() string { + if m != nil { + return m.Command + } + return "" +} + +func (m *Message) GetOptions() []byte { + if m != nil { + return m.Options + } + return nil +} + +func (m *Message) GetFailure() *v1.Failure { + if m != nil { + return m.Failure + } + return nil +} + +func (m *Message) GetPayloads() *v11.Payloads { + if m != nil { + return m.Payloads + } + return nil +} + +func init() { + proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame") + proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message") +} + +func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) } + +var fileDescriptor_2bc2336598a3f7e0 = []byte{ + // 257 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31, + 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec, + 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92, + 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef, + 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c, + 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56, + 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3, + 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47, + 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b, + 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2, + 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf, + 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a, + 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4, + 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43, + 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef, + 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00, + 0x00, +} diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go new file mode 100644 index 00000000..e7a77068 --- /dev/null +++ b/plugins/temporal/protocol/json_codec.go @@ -0,0 +1,225 @@ +package protocol + +import ( + "github.com/fatih/color" + j "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +var json = j.ConfigCompatibleWithStandardLibrary + +// JSONCodec can be used for debugging and log capturing reasons. +type JSONCodec struct { + // level enables verbose logging or all incoming and outcoming messages. + level DebugLevel + + // logger renders messages when debug enabled. + logger logger.Logger +} + +// jsonFrame contains message command in binary form. +type jsonFrame struct { + // ID contains ID of the command, response or error. + ID uint64 `json:"id"` + + // Command name. Optional. + Command string `json:"command,omitempty"` + + // Options to be unmarshalled to body (raw payload). + Options j.RawMessage `json:"options,omitempty"` + + // Failure associated with command id. + Failure []byte `json:"failure,omitempty"` + + // Payloads specific to the command or result. + Payloads []byte `json:"payloads,omitempty"` +} + +// NewJSONCodec creates new Json communication codec. +func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec { + return &JSONCodec{ + level: level, + logger: logger, + } +} + +// WithLogger creates new codes instance with attached logger. +func (c *JSONCodec) WithLogger(logger logger.Logger) Codec { + return &JSONCodec{ + level: c.level, + logger: logger, + } +} + +// GetName returns codec name. +func (c *JSONCodec) GetName() string { + return "json" +} + +// Execute exchanges commands with worker. +func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { + const op = errors.Op("json_codec_execute") + if len(msg) == 0 { + return nil, nil + } + + var response = make([]jsonFrame, 0, 5) + var result = make([]Message, 0, 5) + var err error + + frames := make([]jsonFrame, 0, len(msg)) + for _, m := range msg { + frame, err := c.packFrame(m) + if err != nil { + return nil, errors.E(op, err) + } + + frames = append(frames, frame) + } + + p := payload.Payload{} + + if ctx.IsEmpty() { + p.Context = []byte("null") + } + + p.Context, err = json.Marshal(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + p.Body, err = json.Marshal(frames) + if err != nil { + return nil, errors.E(op, err) + } + + if c.level >= DebugNormal { + logMessage := string(p.Body) + " " + string(p.Context) + if c.level >= DebugHumanized { + logMessage = color.GreenString(logMessage) + } + + c.logger.Debug(logMessage) + } + + out, err := e.Exec(p) + if err != nil { + return nil, errors.E(op, err) + } + + if len(out.Body) == 0 { + // worker inactive or closed + return nil, nil + } + + if c.level >= DebugNormal { + logMessage := string(out.Body) + if c.level >= DebugHumanized { + logMessage = color.HiYellowString(logMessage) + } + + c.logger.Debug(logMessage, "receive", true) + } + + err = json.Unmarshal(out.Body, &response) + if err != nil { + return nil, errors.E(op, err) + } + + for _, f := range response { + msg, err := c.parseFrame(f) + if err != nil { + return nil, errors.E(op, err) + } + + result = append(result, msg) + } + + return result, nil +} + +func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) { + var ( + err error + frame jsonFrame + ) + + frame.ID = msg.ID + + if msg.Payloads != nil { + frame.Payloads, err = msg.Payloads.Marshal() + if err != nil { + return jsonFrame{}, err + } + } + + if msg.Failure != nil { + frame.Failure, err = msg.Failure.Marshal() + if err != nil { + return jsonFrame{}, err + } + } + + if msg.Command == nil { + return frame, nil + } + + frame.Command, err = commandName(msg.Command) + if err != nil { + return jsonFrame{}, err + } + + frame.Options, err = json.Marshal(msg.Command) + if err != nil { + return jsonFrame{}, err + } + + return frame, nil +} + +func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) { + var ( + err error + msg Message + ) + + msg.ID = frame.ID + + if frame.Payloads != nil { + msg.Payloads = &common.Payloads{} + + err = msg.Payloads.Unmarshal(frame.Payloads) + if err != nil { + return Message{}, err + } + } + + if frame.Failure != nil { + msg.Failure = &failure.Failure{} + + err = msg.Failure.Unmarshal(frame.Failure) + if err != nil { + return Message{}, err + } + } + + if frame.Command != "" { + cmd, err := initCommand(frame.Command) + if err != nil { + return Message{}, err + } + + err = json.Unmarshal(frame.Options, &cmd) + if err != nil { + return Message{}, err + } + + msg.Command = cmd + } + + return msg, nil +} diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go new file mode 100644 index 00000000..d5e0f49d --- /dev/null +++ b/plugins/temporal/protocol/message.go @@ -0,0 +1,334 @@ +package protocol + +import ( + "time" + + "github.com/spiral/errors" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/activity" + bindings "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/workflow" +) + +const ( + getWorkerInfoCommand = "GetWorkerInfo" + + invokeActivityCommand = "InvokeActivity" + startWorkflowCommand = "StartWorkflow" + invokeSignalCommand = "InvokeSignal" + invokeQueryCommand = "InvokeQuery" + destroyWorkflowCommand = "DestroyWorkflow" + cancelWorkflowCommand = "CancelWorkflow" + getStackTraceCommand = "StackTrace" + + executeActivityCommand = "ExecuteActivity" + executeChildWorkflowCommand = "ExecuteChildWorkflow" + getChildWorkflowExecutionCommand = "GetChildWorkflowExecution" + + newTimerCommand = "NewTimer" + sideEffectCommand = "SideEffect" + getVersionCommand = "GetVersion" + completeWorkflowCommand = "CompleteWorkflow" + continueAsNewCommand = "ContinueAsNew" + + signalExternalWorkflowCommand = "SignalExternalWorkflow" + cancelExternalWorkflowCommand = "CancelExternalWorkflow" + + cancelCommand = "Cancel" + panicCommand = "Panic" +) + +// GetWorkerInfo reads worker information. +type GetWorkerInfo struct{} + +// InvokeActivity invokes activity. +type InvokeActivity struct { + // Name defines activity name. + Name string `json:"name"` + + // Info contains execution context. + Info activity.Info `json:"info"` + + // HeartbeatDetails indicates that the payload also contains last heartbeat details. + HeartbeatDetails int `json:"heartbeatDetails,omitempty"` +} + +// StartWorkflow sends worker command to start workflow. +type StartWorkflow struct { + // Info to define workflow context. + Info *workflow.Info `json:"info"` + + // LastCompletion contains offset of last completion results. + LastCompletion int `json:"lastCompletion,omitempty"` +} + +// InvokeSignal invokes signal with a set of arguments. +type InvokeSignal struct { + // RunID workflow run id. + RunID string `json:"runId"` + + // Name of the signal. + Name string `json:"name"` +} + +// InvokeQuery invokes query with a set of arguments. +type InvokeQuery struct { + // RunID workflow run id. + RunID string `json:"runId"` + // Name of the query. + Name string `json:"name"` +} + +// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal). +type CancelWorkflow struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// DestroyWorkflow asks worker to offload workflow from memory. +type DestroyWorkflow struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// GetStackTrace asks worker to offload workflow from memory. +type GetStackTrace struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// ExecuteActivity command by workflow worker. +type ExecuteActivity struct { + // Name defines activity name. + Name string `json:"name"` + // Options to run activity. + Options bindings.ExecuteActivityOptions `json:"options,omitempty"` +} + +// ExecuteChildWorkflow executes child workflow. +type ExecuteChildWorkflow struct { + // Name defines workflow name. + Name string `json:"name"` + // Options to run activity. + Options bindings.WorkflowOptions `json:"options,omitempty"` +} + +// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow. +type GetChildWorkflowExecution struct { + // ID of child workflow command. + ID uint64 `json:"id"` +} + +// NewTimer starts new timer. +type NewTimer struct { + // Milliseconds defines timer duration. + Milliseconds int `json:"ms"` +} + +// SideEffect to be recorded into the history. +type SideEffect struct{} + +// GetVersion requests version marker. +type GetVersion struct { + ChangeID string `json:"changeID"` + MinSupported int `json:"minSupported"` + MaxSupported int `json:"maxSupported"` +} + +// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload. +type CompleteWorkflow struct{} + +// ContinueAsNew restarts workflow with new running instance. +type ContinueAsNew struct { + // Result defines workflow execution result. + Name string `json:"name"` + + // Options for continued as new workflow. + Options struct { + TaskQueueName string + WorkflowExecutionTimeout time.Duration + WorkflowRunTimeout time.Duration + WorkflowTaskTimeout time.Duration + } `json:"options"` +} + +// SignalExternalWorkflow sends signal to external workflow. +type SignalExternalWorkflow struct { + Namespace string `json:"namespace"` + WorkflowID string `json:"workflowID"` + RunID string `json:"runID"` + Signal string `json:"signal"` + ChildWorkflowOnly bool `json:"childWorkflowOnly"` +} + +// CancelExternalWorkflow canceller external workflow. +type CancelExternalWorkflow struct { + Namespace string `json:"namespace"` + WorkflowID string `json:"workflowID"` + RunID string `json:"runID"` +} + +// Cancel one or multiple internal promises (activities, local activities, timers, child workflows). +type Cancel struct { + // CommandIDs to be cancelled. + CommandIDs []uint64 `json:"ids"` +} + +// Panic triggers panic in workflow process. +type Panic struct { + // Message to include into the error. + Message string `json:"message"` +} + +// ActivityParams maps activity command to activity params. +func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams { + params := bindings.ExecuteActivityParams{ + ExecuteActivityOptions: cmd.Options, + ActivityType: bindings.ActivityType{Name: cmd.Name}, + Input: payloads, + } + + if params.TaskQueueName == "" { + params.TaskQueueName = env.WorkflowInfo().TaskQueueName + } + + return params +} + +// WorkflowParams maps workflow command to workflow params. +func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams { + params := bindings.ExecuteWorkflowParams{ + WorkflowOptions: cmd.Options, + WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, + Input: payloads, + } + + if params.TaskQueueName == "" { + params.TaskQueueName = env.WorkflowInfo().TaskQueueName + } + + return params +} + +// ToDuration converts timer command to time.Duration. +func (cmd NewTimer) ToDuration() time.Duration { + return time.Millisecond * time.Duration(cmd.Milliseconds) +} + +// returns command name (only for the commands sent to the worker) +func commandName(cmd interface{}) (string, error) { + const op = errors.Op("command_name") + switch cmd.(type) { + case GetWorkerInfo, *GetWorkerInfo: + return getWorkerInfoCommand, nil + case StartWorkflow, *StartWorkflow: + return startWorkflowCommand, nil + case InvokeSignal, *InvokeSignal: + return invokeSignalCommand, nil + case InvokeQuery, *InvokeQuery: + return invokeQueryCommand, nil + case DestroyWorkflow, *DestroyWorkflow: + return destroyWorkflowCommand, nil + case CancelWorkflow, *CancelWorkflow: + return cancelWorkflowCommand, nil + case GetStackTrace, *GetStackTrace: + return getStackTraceCommand, nil + case InvokeActivity, *InvokeActivity: + return invokeActivityCommand, nil + case ExecuteActivity, *ExecuteActivity: + return executeActivityCommand, nil + case ExecuteChildWorkflow, *ExecuteChildWorkflow: + return executeChildWorkflowCommand, nil + case GetChildWorkflowExecution, *GetChildWorkflowExecution: + return getChildWorkflowExecutionCommand, nil + case NewTimer, *NewTimer: + return newTimerCommand, nil + case GetVersion, *GetVersion: + return getVersionCommand, nil + case SideEffect, *SideEffect: + return sideEffectCommand, nil + case CompleteWorkflow, *CompleteWorkflow: + return completeWorkflowCommand, nil + case ContinueAsNew, *ContinueAsNew: + return continueAsNewCommand, nil + case SignalExternalWorkflow, *SignalExternalWorkflow: + return signalExternalWorkflowCommand, nil + case CancelExternalWorkflow, *CancelExternalWorkflow: + return cancelExternalWorkflowCommand, nil + case Cancel, *Cancel: + return cancelCommand, nil + case Panic, *Panic: + return panicCommand, nil + default: + return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd)) + } +} + +// reads command from binary payload +func initCommand(name string) (interface{}, error) { + const op = errors.Op("init_command") + switch name { + case getWorkerInfoCommand: + return &GetWorkerInfo{}, nil + + case startWorkflowCommand: + return &StartWorkflow{}, nil + + case invokeSignalCommand: + return &InvokeSignal{}, nil + + case invokeQueryCommand: + return &InvokeQuery{}, nil + + case destroyWorkflowCommand: + return &DestroyWorkflow{}, nil + + case cancelWorkflowCommand: + return &CancelWorkflow{}, nil + + case getStackTraceCommand: + return &GetStackTrace{}, nil + + case invokeActivityCommand: + return &InvokeActivity{}, nil + + case executeActivityCommand: + return &ExecuteActivity{}, nil + + case executeChildWorkflowCommand: + return &ExecuteChildWorkflow{}, nil + + case getChildWorkflowExecutionCommand: + return &GetChildWorkflowExecution{}, nil + + case newTimerCommand: + return &NewTimer{}, nil + + case getVersionCommand: + return &GetVersion{}, nil + + case sideEffectCommand: + return &SideEffect{}, nil + + case completeWorkflowCommand: + return &CompleteWorkflow{}, nil + + case continueAsNewCommand: + return &ContinueAsNew{}, nil + + case signalExternalWorkflowCommand: + return &SignalExternalWorkflow{}, nil + + case cancelExternalWorkflowCommand: + return &CancelExternalWorkflow{}, nil + + case cancelCommand: + return &Cancel{}, nil + + case panicCommand: + return &Panic{}, nil + + default: + return nil, errors.E(op, errors.Errorf("undefined command name: %s", name)) + } +} diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go new file mode 100644 index 00000000..607fe0fe --- /dev/null +++ b/plugins/temporal/protocol/proto_codec.go @@ -0,0 +1,145 @@ +package protocol + +import ( + v1 "github.com/golang/protobuf/proto" //nolint:staticcheck + jsoniter "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal" + "google.golang.org/protobuf/proto" +) + +type ( + // ProtoCodec uses protobuf to exchange messages with underlying workers. + ProtoCodec struct { + } +) + +// NewProtoCodec creates new Proto communication codec. +func NewProtoCodec() Codec { + return &ProtoCodec{} +} + +// WithLogger creates new codes instance with attached logger. +func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec { + return &ProtoCodec{} +} + +// GetName returns codec name. +func (c *ProtoCodec) GetName() string { + return "protobuf" +} + +// Execute exchanges commands with worker. +func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { + if len(msg) == 0 { + return nil, nil + } + + var request = &internal.Frame{} + var response = &internal.Frame{} + var result = make([]Message, 0, 5) + var err error + + for _, m := range msg { + frame, err := c.packMessage(m) + if err != nil { + return nil, err + } + + request.Messages = append(request.Messages, frame) + } + + p := payload.Payload{} + + // context is always in json format + if ctx.IsEmpty() { + p.Context = []byte("null") + } + + p.Context, err = jsoniter.Marshal(ctx) + if err != nil { + return nil, errors.E(errors.Op("encodeContext"), err) + } + + p.Body, err = proto.Marshal(v1.MessageV2(request)) + if err != nil { + return nil, errors.E(errors.Op("encodePayload"), err) + } + + out, err := e.Exec(p) + if err != nil { + return nil, errors.E(errors.Op("execute"), err) + } + + if len(out.Body) == 0 { + // worker inactive or closed + return nil, nil + } + + err = proto.Unmarshal(out.Body, v1.MessageV2(response)) + if err != nil { + return nil, errors.E(errors.Op("parseResponse"), err) + } + + for _, f := range response.Messages { + msg, err := c.parseMessage(f) + if err != nil { + return nil, err + } + + result = append(result, msg) + } + + return result, nil +} + +func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) { + var err error + + frame := &internal.Message{ + Id: msg.ID, + Payloads: msg.Payloads, + Failure: msg.Failure, + } + + if msg.Command != nil { + frame.Command, err = commandName(msg.Command) + if err != nil { + return nil, err + } + + frame.Options, err = jsoniter.Marshal(msg.Command) + if err != nil { + return nil, err + } + } + + return frame, nil +} + +func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) { + const op = errors.Op("proto_codec_parse_message") + var err error + + msg := Message{ + ID: frame.Id, + Payloads: frame.Payloads, + Failure: frame.Failure, + } + + if frame.Command != "" { + msg.Command, err = initCommand(frame.Command) + if err != nil { + return Message{}, errors.E(op, err) + } + + err = jsoniter.Unmarshal(frame.Options, &msg.Command) + if err != nil { + return Message{}, errors.E(op, err) + } + } + + return msg, nil +} diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go new file mode 100644 index 00000000..53076fdf --- /dev/null +++ b/plugins/temporal/protocol/protocol.go @@ -0,0 +1,77 @@ +package protocol + +import ( + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +const ( + // DebugNone disables all debug messages. + DebugNone = iota + + // DebugNormal renders all messages into console. + DebugNormal + + // DebugHumanized enables color highlights for messages. + DebugHumanized +) + +// Context provides worker information about currently. Context can be empty for server level commands. +type Context struct { + // TaskQueue associates message batch with the specific task queue in underlying worker. + TaskQueue string `json:"taskQueue,omitempty"` + + // TickTime associated current or historical time with message batch. + TickTime string `json:"tickTime,omitempty"` + + // Replay indicates that current message batch is historical. + Replay bool `json:"replay,omitempty"` +} + +// Message used to exchange the send commands and receive responses from underlying workers. +type Message struct { + // ID contains ID of the command, response or error. + ID uint64 `json:"id"` + + // Command of the message in unmarshalled form. Pointer. + Command interface{} `json:"command,omitempty"` + + // Failure associated with command id. + Failure *failure.Failure `json:"failure,omitempty"` + + // Payloads contains message specific payloads in binary format. + Payloads *commonpb.Payloads `json:"payloads,omitempty"` +} + +// Codec manages payload encoding and decoding while communication with underlying worker. +type Codec interface { + // WithLogger creates new codes instance with attached logger. + WithLogger(logger.Logger) Codec + + // GetName returns codec name. + GetName() string + + // Execute sends message to worker and waits for the response. + Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) +} + +// Endpoint provides the ability to send and receive messages. +type Endpoint interface { + // ExecWithContext allow to set ExecTTL + Exec(p payload.Payload) (payload.Payload, error) +} + +// DebugLevel configures debug level. +type DebugLevel int + +// IsEmpty only check if task queue set. +func (ctx Context) IsEmpty() bool { + return ctx.TaskQueue == "" +} + +// IsCommand returns true if message carries request. +func (msg Message) IsCommand() bool { + return msg.Command != nil +} diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go new file mode 100644 index 00000000..58a0ae66 --- /dev/null +++ b/plugins/temporal/protocol/worker_info.go @@ -0,0 +1,72 @@ +package protocol + +import ( + "github.com/spiral/errors" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/worker" +) + +// WorkerInfo outlines information about every available worker and it's TaskQueues. + +// WorkerInfo lists available task queues, workflows and activities. +type WorkerInfo struct { + // TaskQueue assigned to the worker. + TaskQueue string `json:"taskQueue"` + + // Options describe worker options. + Options worker.Options `json:"options,omitempty"` + + // Workflows provided by the worker. + Workflows []WorkflowInfo + + // Activities provided by the worker. + Activities []ActivityInfo +} + +// WorkflowInfo describes single worker workflow. +type WorkflowInfo struct { + // Name of the workflow. + Name string `json:"name"` + + // Queries pre-defined for the workflow type. + Queries []string `json:"queries"` + + // Signals pre-defined for the workflow type. + Signals []string `json:"signals"` +} + +// ActivityInfo describes single worker activity. +type ActivityInfo struct { + // Name describes public activity name. + Name string `json:"name"` +} + +// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process). +func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) { + const op = errors.Op("fetch_worker_info") + + result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}}) + if err != nil { + return nil, errors.E(op, err) + } + + if len(result) != 1 { + return nil, errors.E(op, errors.Str("unable to read worker info")) + } + + if result[0].ID != 0 { + return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing")) + } + + var info []WorkerInfo + for i := range result[0].Payloads.Payloads { + wi := WorkerInfo{} + if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil { + return nil, errors.E(op, err) + } + + info = append(info, wi) + } + + return info, nil +} diff --git a/plugins/temporal/workflow/canceller.go b/plugins/temporal/workflow/canceller.go new file mode 100644 index 00000000..962c527f --- /dev/null +++ b/plugins/temporal/workflow/canceller.go @@ -0,0 +1,41 @@ +package workflow + +import ( + "sync" +) + +type cancellable func() error + +type canceller struct { + ids sync.Map +} + +func (c *canceller) register(id uint64, cancel cancellable) { + c.ids.Store(id, cancel) +} + +func (c *canceller) discard(id uint64) { + c.ids.Delete(id) +} + +func (c *canceller) cancel(ids ...uint64) error { + var err error + for _, id := range ids { + cancel, ok := c.ids.Load(id) + if ok == false { + continue + } + + // TODO return when minimum supported version will be go 1.15 + // go1.14 don't have LoadAndDelete method + // It was introduced only in go1.15 + c.ids.Delete(id) + + err = cancel.(cancellable)() + if err != nil { + return err + } + } + + return nil +} diff --git a/plugins/temporal/workflow/canceller_test.go b/plugins/temporal/workflow/canceller_test.go new file mode 100644 index 00000000..d6e846f8 --- /dev/null +++ b/plugins/temporal/workflow/canceller_test.go @@ -0,0 +1,33 @@ +package workflow + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_CancellerNoListeners(t *testing.T) { + c := &canceller{} + + assert.NoError(t, c.cancel(1)) +} + +func Test_CancellerListenerError(t *testing.T) { + c := &canceller{} + c.register(1, func() error { + return errors.New("failed") + }) + + assert.Error(t, c.cancel(1)) +} + +func Test_CancellerListenerDiscarded(t *testing.T) { + c := &canceller{} + c.register(1, func() error { + return errors.New("failed") + }) + + c.discard(1) + assert.NoError(t, c.cancel(1)) +} diff --git a/plugins/temporal/workflow/id_registry.go b/plugins/temporal/workflow/id_registry.go new file mode 100644 index 00000000..ac75cbda --- /dev/null +++ b/plugins/temporal/workflow/id_registry.go @@ -0,0 +1,51 @@ +package workflow + +import ( + "sync" + + bindings "go.temporal.io/sdk/internalbindings" +) + +// used to gain access to child workflow ids after they become available via callback result. +type idRegistry struct { + mu sync.Mutex + ids map[uint64]entry + listeners map[uint64]listener +} + +type listener func(w bindings.WorkflowExecution, err error) + +type entry struct { + w bindings.WorkflowExecution + err error +} + +func newIDRegistry() *idRegistry { + return &idRegistry{ + ids: map[uint64]entry{}, + listeners: map[uint64]listener{}, + } +} + +func (c *idRegistry) listen(id uint64, cl listener) { + c.mu.Lock() + defer c.mu.Unlock() + + c.listeners[id] = cl + + if e, ok := c.ids[id]; ok { + cl(e.w, e.err) + } +} + +func (c *idRegistry) push(id uint64, w bindings.WorkflowExecution, err error) { + c.mu.Lock() + defer c.mu.Unlock() + + e := entry{w: w, err: err} + c.ids[id] = e + + if l, ok := c.listeners[id]; ok { + l(e.w, e.err) + } +} diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go new file mode 100644 index 00000000..8f4409d1 --- /dev/null +++ b/plugins/temporal/workflow/message_queue.go @@ -0,0 +1,47 @@ +package workflow + +import ( + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +type messageQueue struct { + seqID func() uint64 + queue []rrt.Message +} + +func newMessageQueue(sedID func() uint64) *messageQueue { + return &messageQueue{ + seqID: sedID, + queue: make([]rrt.Message, 0, 5), + } +} + +func (mq *messageQueue) flush() { + mq.queue = mq.queue[0:0] +} + +func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) { + msg := rrt.Message{ + ID: mq.seqID(), + Command: cmd, + Payloads: payloads, + } + + return msg.ID, msg +} + +func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 { + id, msg := mq.allocateMessage(cmd, payloads) + mq.queue = append(mq.queue, msg) + return id +} + +func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) { + mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads}) +} + +func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) { + mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure}) +} diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go new file mode 100644 index 00000000..1fcd409f --- /dev/null +++ b/plugins/temporal/workflow/message_queue_test.go @@ -0,0 +1,53 @@ +package workflow + +import ( + "sync/atomic" + "testing" + + "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "github.com/stretchr/testify/assert" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +func Test_MessageQueueFlushError(t *testing.T) { + var index uint64 + mq := newMessageQueue(func() uint64 { + return atomic.AddUint64(&index, 1) + }) + + mq.pushError(1, &failure.Failure{}) + assert.Len(t, mq.queue, 1) + + mq.flush() + assert.Len(t, mq.queue, 0) + assert.Equal(t, uint64(0), index) +} + +func Test_MessageQueueFlushResponse(t *testing.T) { + var index uint64 + mq := newMessageQueue(func() uint64 { + return atomic.AddUint64(&index, 1) + }) + + mq.pushResponse(1, &common.Payloads{}) + assert.Len(t, mq.queue, 1) + + mq.flush() + assert.Len(t, mq.queue, 0) + assert.Equal(t, uint64(0), index) +} + +func Test_MessageQueueCommandID(t *testing.T) { + var index uint64 + mq := newMessageQueue(func() uint64 { + return atomic.AddUint64(&index, 1) + }) + + n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{}) + assert.Equal(t, n, index) + assert.Len(t, mq.queue, 1) + + mq.flush() + assert.Len(t, mq.queue, 0) +} diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go new file mode 100644 index 00000000..572d9a3b --- /dev/null +++ b/plugins/temporal/workflow/plugin.go @@ -0,0 +1,203 @@ +package workflow + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/client" +) + +const ( + // PluginName defines public service name. + PluginName = "workflows" + + // RRMode sets as RR_MODE env variable to let worker know about the mode to run. + RRMode = "temporal/workflow" +) + +// Plugin manages workflows and workers. +type Plugin struct { + temporal client.Temporal + events events.Handler + server server.Server + log logger.Logger + mu sync.Mutex + reset chan struct{} + pool workflowPool + closing int64 +} + +// Init workflow plugin. +func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { + p.temporal = temporal + p.server = server + p.events = events.NewEventsHandler() + p.log = log + p.reset = make(chan struct{}, 1) + + return nil +} + +// Serve starts workflow service. +func (p *Plugin) Serve() chan error { + const op = errors.Op("workflow_plugin_serve") + errCh := make(chan error, 1) + + pool, err := p.startPool() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + p.pool = pool + + go func() { + for { + select { + case <-p.reset: + if atomic.LoadInt64(&p.closing) == 1 { + return + } + + err := p.replacePool() + if err == nil { + continue + } + + bkoff := backoff.NewExponentialBackOff() + bkoff.InitialInterval = time.Second + + err = backoff.Retry(p.replacePool, bkoff) + if err != nil { + errCh <- errors.E(op, err) + } + } + } + }() + + return errCh +} + +// Stop workflow service. +func (p *Plugin) Stop() error { + const op = errors.Op("workflow_plugin_stop") + atomic.StoreInt64(&p.closing, 1) + + pool := p.getPool() + if pool != nil { + p.pool = nil + err := pool.Destroy(context.Background()) + if err != nil { + return errors.E(op, err) + } + return nil + } + + return nil +} + +// Name of the service. +func (p *Plugin) Name() string { + return PluginName +} + +// Workers returns list of available workflow workers. +func (p *Plugin) Workers() []worker.BaseProcess { + p.mu.Lock() + defer p.mu.Unlock() + return p.pool.Workers() +} + +// WorkflowNames returns list of all available workflows. +func (p *Plugin) WorkflowNames() []string { + return p.pool.WorkflowNames() +} + +// Reset resets underlying workflow pool with new copy. +func (p *Plugin) Reset() error { + p.reset <- struct{}{} + + return nil +} + +// AddListener adds event listeners to the service. +func (p *Plugin) poolListener(event interface{}) { + if ev, ok := event.(PoolEvent); ok { + if ev.Event == eventWorkerExit { + if ev.Caused != nil { + p.log.Error("Workflow pool error", "error", ev.Caused) + } + p.reset <- struct{}{} + } + } + + p.events.Push(event) +} + +// AddListener adds event listeners to the service. +func (p *Plugin) startPool() (workflowPool, error) { + const op = errors.Op("workflow_plugin_start_pool") + pool, err := newWorkflowPool( + p.temporal.GetCodec().WithLogger(p.log), + p.poolListener, + p.server, + ) + if err != nil { + return nil, errors.E(op, err) + } + + err = pool.Start(context.Background(), p.temporal) + if err != nil { + return nil, errors.E(op, err) + } + + p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames()) + + return pool, nil +} + +func (p *Plugin) replacePool() error { + p.mu.Lock() + const op = errors.Op("workflow_plugin_replace_pool") + defer p.mu.Unlock() + + if p.pool != nil { + err := p.pool.Destroy(context.Background()) + p.pool = nil + if err != nil { + p.log.Error( + "Unable to destroy expired workflow pool", + "error", + errors.E(op, err), + ) + return errors.E(op, err) + } + } + + pool, err := p.startPool() + if err != nil { + p.log.Error("Replace workflow pool failed", "error", err) + return errors.E(op, err) + } + + p.pool = pool + p.log.Debug("workflow pool successfully replaced") + + return nil +} + +// getPool returns currently pool. +func (p *Plugin) getPool() workflowPool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.pool +} diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go new file mode 100644 index 00000000..45e6885c --- /dev/null +++ b/plugins/temporal/workflow/process.go @@ -0,0 +1,436 @@ +package workflow + +import ( + "strconv" + "sync/atomic" + "time" + + "github.com/spiral/errors" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + commonpb "go.temporal.io/api/common/v1" + bindings "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/workflow" +) + +// wraps single workflow process +type workflowProcess struct { + codec rrt.Codec + pool workflowPool + env bindings.WorkflowEnvironment + header *commonpb.Header + mq *messageQueue + ids *idRegistry + seqID uint64 + runID string + pipeline []rrt.Message + callbacks []func() error + canceller *canceller + inLoop bool +} + +// Execute workflow, bootstraps process. +func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) { + wf.env = env + wf.header = header + wf.seqID = 0 + wf.runID = env.WorkflowInfo().WorkflowExecution.RunID + wf.canceller = &canceller{} + + // sequenceID shared for all worker workflows + wf.mq = newMessageQueue(wf.pool.SeqID) + wf.ids = newIDRegistry() + + env.RegisterCancelHandler(wf.handleCancel) + env.RegisterSignalHandler(wf.handleSignal) + env.RegisterQueryHandler(wf.handleQuery) + + var ( + lastCompletion = bindings.GetLastCompletionResult(env) + lastCompletionOffset = 0 + ) + + if lastCompletion != nil && len(lastCompletion.Payloads) != 0 { + if input == nil { + input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}} + } + + input.Payloads = append(input.Payloads, lastCompletion.Payloads...) + lastCompletionOffset = len(lastCompletion.Payloads) + } + + _ = wf.mq.pushCommand( + rrt.StartWorkflow{ + Info: env.WorkflowInfo(), + LastCompletion: lastCompletionOffset, + }, + input, + ) +} + +// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server. +func (wf *workflowProcess) OnWorkflowTaskStarted() { + wf.inLoop = true + defer func() { wf.inLoop = false }() + + var err error + for _, callback := range wf.callbacks { + err = callback() + if err != nil { + panic(err) + } + } + wf.callbacks = nil + + if err := wf.flushQueue(); err != nil { + panic(err) + } + + for len(wf.pipeline) > 0 { + msg := wf.pipeline[0] + wf.pipeline = wf.pipeline[1:] + + if msg.IsCommand() { + err = wf.handleMessage(msg) + } + + if err != nil { + panic(err) + } + } +} + +// StackTrace renders workflow stack trace. +func (wf *workflowProcess) StackTrace() string { + result, err := wf.runCommand( + rrt.GetStackTrace{ + RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, + }, + nil, + ) + + if err != nil { + return err.Error() + } + + var stacktrace string + err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace) + if err != nil { + return err.Error() + } + + return stacktrace +} + +// Close the workflow. +func (wf *workflowProcess) Close() { + // TODO: properly handle errors + // panic(err) + + _ = wf.mq.pushCommand( + rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, + nil, + ) + + _, _ = wf.discardQueue() +} + +// execution context. +func (wf *workflowProcess) getContext() rrt.Context { + return rrt.Context{ + TaskQueue: wf.env.WorkflowInfo().TaskQueueName, + TickTime: wf.env.Now().Format(time.RFC3339), + Replay: wf.env.IsReplaying(), + } +} + +// schedule cancel command +func (wf *workflowProcess) handleCancel() { + _ = wf.mq.pushCommand( + rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, + nil, + ) +} + +// schedule the signal processing +func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) { + _ = wf.mq.pushCommand( + rrt.InvokeSignal{ + RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, + Name: name, + }, + input, + ) +} + +// Handle query in blocking mode. +func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) { + result, err := wf.runCommand( + rrt.InvokeQuery{ + RunID: wf.runID, + Name: queryType, + }, + queryArgs, + ) + + if err != nil { + return nil, err + } + + if result.Failure != nil { + return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter()) + } + + return result.Payloads, nil +} + +// process incoming command +func (wf *workflowProcess) handleMessage(msg rrt.Message) error { + const op = errors.Op("handleMessage") + var err error + + var ( + id = msg.ID + cmd = msg.Command + payloads = msg.Payloads + ) + + switch cmd := cmd.(type) { + case *rrt.ExecuteActivity: + params := cmd.ActivityParams(wf.env, payloads) + activityID := wf.env.ExecuteActivity(params, wf.createCallback(id)) + + wf.canceller.register(id, func() error { + wf.env.RequestCancelActivity(activityID) + return nil + }) + + case *rrt.ExecuteChildWorkflow: + params := cmd.WorkflowParams(wf.env, payloads) + + // always use deterministic id + if params.WorkflowID == "" { + nextID := atomic.AddUint64(&wf.seqID, 1) + params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID)) + } + + wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) { + wf.ids.push(id, r, e) + }) + + wf.canceller.register(id, func() error { + wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID) + return nil + }) + + case *rrt.GetChildWorkflowExecution: + wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) { + cl := wf.createCallback(id) + + // TODO rewrite + if err != nil { + panic(err) + } + + p, err := wf.env.GetDataConverter().ToPayloads(w) + if err != nil { + panic(err) + } + + cl(p, err) + }) + + case *rrt.NewTimer: + timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id)) + wf.canceller.register(id, func() error { + if timerID != nil { + wf.env.RequestCancelTimer(*timerID) + } + return nil + }) + + case *rrt.GetVersion: + version := wf.env.GetVersion( + cmd.ChangeID, + workflow.Version(cmd.MinSupported), + workflow.Version(cmd.MaxSupported), + ) + + result, err := wf.env.GetDataConverter().ToPayloads(version) + if err != nil { + return errors.E(op, err) + } + + wf.mq.pushResponse(id, result) + err = wf.flushQueue() + if err != nil { + panic(err) + } + + case *rrt.SideEffect: + wf.env.SideEffect( + func() (*commonpb.Payloads, error) { + return payloads, nil + }, + wf.createContinuableCallback(id), + ) + + case *rrt.CompleteWorkflow: + result, _ := wf.env.GetDataConverter().ToPayloads("completed") + wf.mq.pushResponse(id, result) + + if msg.Failure == nil { + wf.env.Complete(payloads, nil) + } else { + wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter())) + } + + case *rrt.ContinueAsNew: + result, _ := wf.env.GetDataConverter().ToPayloads("completed") + wf.mq.pushResponse(id, result) + + wf.env.Complete(nil, &workflow.ContinueAsNewError{ + WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, + Input: payloads, + Header: wf.header, + TaskQueueName: cmd.Options.TaskQueueName, + WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout, + WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout, + WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout, + }) + + case *rrt.SignalExternalWorkflow: + wf.env.SignalExternalWorkflow( + cmd.Namespace, + cmd.WorkflowID, + cmd.RunID, + cmd.Signal, + payloads, + nil, + cmd.ChildWorkflowOnly, + wf.createCallback(id), + ) + + case *rrt.CancelExternalWorkflow: + wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id)) + + case *rrt.Cancel: + err = wf.canceller.cancel(cmd.CommandIDs...) + if err != nil { + return errors.E(op, err) + } + + result, _ := wf.env.GetDataConverter().ToPayloads("completed") + wf.mq.pushResponse(id, result) + + err = wf.flushQueue() + if err != nil { + panic(err) + } + + case *rrt.Panic: + panic(errors.E(cmd.Message)) + + default: + panic("undefined command") + } + + return nil +} + +func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler { + callback := func(result *commonpb.Payloads, err error) error { + wf.canceller.discard(id) + + if err != nil { + wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) + return nil + } + + // fetch original payload + wf.mq.pushResponse(id, result) + return nil + } + + return func(result *commonpb.Payloads, err error) { + // timer cancel callback can happen inside the loop + if wf.inLoop { + err := callback(result, err) + if err != nil { + panic(err) + } + + return + } + + wf.callbacks = append(wf.callbacks, func() error { + return callback(result, err) + }) + } +} + +// callback to be called inside the queue processing, adds new messages at the end of the queue +func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler { + callback := func(result *commonpb.Payloads, err error) { + wf.canceller.discard(id) + + if err != nil { + wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) + return + } + + wf.mq.pushResponse(id, result) + err = wf.flushQueue() + if err != nil { + panic(err) + } + } + + return func(result *commonpb.Payloads, err error) { + callback(result, err) + } +} + +// Exchange messages between host and worker processes and add new commands to the queue. +func (wf *workflowProcess) flushQueue() error { + const op = errors.Op("flush queue") + messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) + wf.mq.flush() + + if err != nil { + return errors.E(op, err) + } + + wf.pipeline = append(wf.pipeline, messages...) + + return nil +} + +// Exchange messages between host and worker processes without adding new commands to the queue. +func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) { + const op = errors.Op("discard queue") + messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) + wf.mq.flush() + + if err != nil { + return nil, errors.E(op, err) + } + + return messages, nil +} + +// Run single command and return single result. +func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) { + const op = errors.Op("workflow_process_runcommand") + _, msg := wf.mq.allocateMessage(cmd, payloads) + + result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg) + if err != nil { + return rrt.Message{}, errors.E(op, err) + } + + if len(result) != 1 { + return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response")) + } + + return result[0], nil +} diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go new file mode 100644 index 00000000..b9ed46c8 --- /dev/null +++ b/plugins/temporal/workflow/workflow_pool.go @@ -0,0 +1,190 @@ +package workflow + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + rrWorker "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/client" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + bindings "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) + +const eventWorkerExit = 8390 + +// RR_MODE env variable key +const RR_MODE = "RR_MODE" //nolint + +// RR_CODEC env variable key +const RR_CODEC = "RR_CODEC" //nolint + +type workflowPool interface { + SeqID() uint64 + Exec(p payload.Payload) (payload.Payload, error) + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.BaseProcess + WorkflowNames() []string +} + +// PoolEvent triggered on workflow pool worker events. +type PoolEvent struct { + Event int + Context interface{} + Caused error +} + +// workflowPoolImpl manages workflowProcess executions between worker restarts. +type workflowPoolImpl struct { + codec rrt.Codec + seqID uint64 + workflows map[string]rrt.WorkflowInfo + tWorkers []worker.Worker + mu sync.Mutex + worker rrWorker.SyncWorker + active bool +} + +// newWorkflowPool creates new workflow pool. +func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) { + const op = errors.Op("new_workflow_pool") + w, err := factory.NewWorker( + context.Background(), + map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}, + listener, + ) + if err != nil { + return nil, errors.E(op, err) + } + + go func() { + err := w.Wait() + listener(PoolEvent{Event: eventWorkerExit, Caused: err}) + }() + + return &workflowPoolImpl{codec: codec, worker: rrWorker.From(w)}, nil +} + +// Start the pool in non blocking mode. +func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("workflow_pool_start") + pool.mu.Lock() + pool.active = true + pool.mu.Unlock() + + err := pool.initWorkers(ctx, temporal) + if err != nil { + return errors.E(op, err) + } + + for i := 0; i < len(pool.tWorkers); i++ { + err := pool.tWorkers[i].Start() + if err != nil { + return errors.E(op, err) + } + } + + return nil +} + +// Active. +func (pool *workflowPoolImpl) Active() bool { + return pool.active +} + +// Destroy stops all temporal workers and application worker. +func (pool *workflowPoolImpl) Destroy(ctx context.Context) error { + pool.mu.Lock() + defer pool.mu.Unlock() + const op = errors.Op("workflow_pool_destroy") + + pool.active = false + for i := 0; i < len(pool.tWorkers); i++ { + pool.tWorkers[i].Stop() + } + + worker.PurgeStickyWorkflowCache() + + err := pool.worker.Stop() + if err != nil { + return errors.E(op, err) + } + + return nil +} + +// NewWorkflowDefinition initiates new workflow process. +func (pool *workflowPoolImpl) NewWorkflowDefinition() bindings.WorkflowDefinition { + return &workflowProcess{ + codec: pool.codec, + pool: pool, + } +} + +// NewWorkflowDefinition initiates new workflow process. +func (pool *workflowPoolImpl) SeqID() uint64 { + return atomic.AddUint64(&pool.seqID, 1) +} + +// Exec set of commands in thread safe move. +func (pool *workflowPoolImpl) Exec(p payload.Payload) (payload.Payload, error) { + pool.mu.Lock() + defer pool.mu.Unlock() + + if !pool.active { + return payload.Payload{}, nil + } + + return pool.worker.Exec(p) +} + +func (pool *workflowPoolImpl) Workers() []rrWorker.BaseProcess { + return []rrWorker.BaseProcess{pool.worker} +} + +func (pool *workflowPoolImpl) WorkflowNames() []string { + names := make([]string, 0, len(pool.workflows)) + for name := range pool.workflows { + names = append(names, name) + } + + return names +} + +// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. +func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("workflow_pool_init_workers") + workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter()) + if err != nil { + return errors.E(op, err) + } + + pool.workflows = make(map[string]rrt.WorkflowInfo) + pool.tWorkers = make([]worker.Worker, 0) + + for _, info := range workerInfo { + w, err := temporal.CreateWorker(info.TaskQueue, info.Options) + if err != nil { + return errors.E(op, err, pool.Destroy(ctx)) + } + + pool.tWorkers = append(pool.tWorkers, w) + for _, workflowInfo := range info.Workflows { + w.RegisterWorkflowWithOptions(pool, workflow.RegisterOptions{ + Name: workflowInfo.Name, + DisableAlreadyRegisteredCheck: false, + }) + + pool.workflows[workflowInfo.Name] = workflowInfo + } + } + + return nil +} diff --git a/tests/composer.json b/tests/composer.json index 889b6808..060f105e 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -1,14 +1,15 @@ { - "minimum-stability": "beta", + "minimum-stability": "dev", "prefer-stable": true, "require": { "nyholm/psr7": "^1.3", "spiral/roadrunner": "^2.0", - "spiral/roadrunner-http": "^2.0" + "spiral/roadrunner-http": "^2.0", + "temporal/sdk": "dev-master" }, "autoload": { "psr-4": { - "Spiral\\RoadRunner\\": "src/" + "Temporal\\Tests\\": "temporal" } } } diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index fd1a48bf..fa1070e1 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -4,4 +4,36 @@ services: memcached: image: memcached:latest ports: - - "0.0.0.0:11211:11211"
\ No newline at end of file + - "0.0.0.0:11211:11211" + cassandra: + image: cassandra:3.11 + ports: + - "9042:9042" + temporal: + image: temporalio/auto-setup:${SERVER_TAG:-1.1.0} + ports: + - "7233:7233" + volumes: + - ${DYNAMIC_CONFIG_DIR:-../config/dynamicconfig}:/etc/temporal/config/dynamicconfig + environment: + - "CASSANDRA_SEEDS=cassandra" + - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml" + depends_on: + - cassandra + temporal-admin-tools: + image: temporalio/admin-tools:${SERVER_TAG:-1.1.0} + stdin_open: true + tty: true + environment: + - "TEMPORAL_CLI_ADDRESS=temporal:7233" + depends_on: + - temporal + temporal-web: + image: temporalio/web:${WEB_TAG:-1.5.0} + environment: + - "TEMPORAL_GRPC_ENDPOINT=temporal:7233" + - "TEMPORAL_PERMIT_WRITE_API=true" + ports: + - "8088:8088" + depends_on: + - temporal
\ No newline at end of file diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index e47dbd44..fc672e36 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -156,7 +156,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { }, nil, p) assert.NoError(t, err) - hs := &http.Server{Addr: ":8088", Handler: h} + hs := &http.Server{Addr: ":19658", Handler: h} defer func() { err := hs.Shutdown(context.Background()) if err != nil { @@ -172,7 +172,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) + req, err := http.NewRequest("GET", "http://localhost:19658?hello=world", nil) assert.NoError(t, err) req.Header.Add("user-agent", "") @@ -216,7 +216,7 @@ func TestHandler_User_Agent(t *testing.T) { }, nil, p) assert.NoError(t, err) - hs := &http.Server{Addr: ":8088", Handler: h} + hs := &http.Server{Addr: ":25688", Handler: h} defer func() { err := hs.Shutdown(context.Background()) if err != nil { @@ -232,7 +232,7 @@ func TestHandler_User_Agent(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) + req, err := http.NewRequest("GET", "http://localhost:25688?hello=world", nil) assert.NoError(t, err) req.Header.Add("User-Agent", "go-agent") diff --git a/tests/plugins/temporal/.rr.yaml b/tests/plugins/temporal/.rr.yaml new file mode 100644 index 00000000..04d0730d --- /dev/null +++ b/tests/plugins/temporal/.rr.yaml @@ -0,0 +1,22 @@ +# Application configuration +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php worker.php" + +# Workflow and activity mesh service +temporal: + address: "localhost:7233" + activities: + num_workers: 4 + codec: json + debug_level: 2 + +logs: + mode: none + channels: + activities: + mode: none + #workflows: + #mode: none
\ No newline at end of file diff --git a/tests/plugins/temporal/cancel_test.go b/tests/plugins/temporal/cancel_test.go new file mode 100644 index 00000000..0fd3c126 --- /dev/null +++ b/tests/plugins/temporal/cancel_test.go @@ -0,0 +1,291 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func Test_SimpleWorkflowCancel(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflow") + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 500) + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.Error(t, w.Get(context.Background(), &result)) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + assert.Equal(t, "Canceled", we.WorkflowExecutionInfo.Status.String()) +} + +func Test_CancellableWorkflowScope(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledScopeWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "yes", result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_TIMER_CANCELED + }) + + s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED + }) +} + +func Test_CancelledWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "CANCELLED", result) +} + +func Test_CancelledWithCompensationWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledWithCompensationWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "yield", + "rollback", + "captured retry", + "captured promise on cancelled", + "START rollback", + "WAIT ROLLBACK", + "RESULT (ROLLBACK)", "DONE rollback", + "COMPLETE rollback", + "result: OK", + }, + trace, + ) +} + +func Test_CancelledNestedWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledNestedWorkflow", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "CANCELLED", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "begin", + "first scope", + "second scope", + "close second scope", + "close first scope", + "second scope cancelled", + "first scope cancelled", + "close process", + }, + trace, + ) +} + +func Test_CancelledNSingleScopeWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledSingleScopeWorkflow", + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "start", + "in scope", + "on cancel", + "captured in scope", + "captured in process", + }, + trace, + ) +} + +func Test_CancelledMidflightWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelledMidflightWorkflow", + ) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "start", + "in scope", + "on cancel", + "done cancel", + }, + trace, + ) + + s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED + }) +} + +func Test_CancelSignalledChildWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "CancelSignalledChildWorkflow", + ) + assert.NoError(t, err) + + var result interface{} + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "cancelled ok", result) + + e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") + assert.NoError(t, err) + + trace := make([]string, 0) + assert.NoError(t, e.Get(&trace)) + assert.Equal( + t, + []string{ + "start", + "child started", + "child signalled", + "scope cancelled", + "process done", + }, + trace, + ) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED + }) +} diff --git a/tests/plugins/temporal/child_test.go b/tests/plugins/temporal/child_test.go new file mode 100644 index 00000000..49521791 --- /dev/null +++ b/tests/plugins/temporal/child_test.go @@ -0,0 +1,84 @@ +package tests + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/client" +) + +func Test_ExecuteChildWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WithChildWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "Child: CHILD HELLO WORLD", result) +} + +func Test_ExecuteChildStubWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WithChildStubWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "Child: CHILD HELLO WORLD", result) +} + +func Test_ExecuteChildStubWorkflow_02(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ChildStubWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result []string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, []string{"HELLO WORLD", "UNTYPED"}, result) +} + +func Test_SignalChildViaStubWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SignalChildViaStubWorkflow", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 8, result) +} diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go new file mode 100644 index 00000000..9ca4d018 --- /dev/null +++ b/tests/plugins/temporal/disaster_test.go @@ -0,0 +1,114 @@ +package tests + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/client" +) + +func Test_WorkerError_DisasterRecovery(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) + assert.NoError(t, err) + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 750) + + // must fully recover with new worker + assert.NoError(t, p.Kill()) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) +} + +func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + defer func() { + // always restore script + _ = os.Rename("worker.bak", "worker.php") + }() + + // Makes worker pool unable to recover for some time + _ = os.Rename("worker.php", "worker.bak") + + p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) + assert.NoError(t, err) + + // must fully recover with new worker + assert.NoError(t, p.Kill()) + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 750) + + // restore the script and recover activity pool + _ = os.Rename("worker.bak", "worker.php") + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) +} + +func Test_ActivityError_DisasterRecovery(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + defer func() { + // always restore script + _ = os.Rename("worker.bak", "worker.php") + }() + + // Makes worker pool unable to recover for some time + _ = os.Rename("worker.php", "worker.bak") + + // destroys all workers in activities + for _, wrk := range s.activities.Workers() { + assert.NoError(t, wrk.Kill()) + } + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + // activity can't complete at this moment + time.Sleep(time.Millisecond * 750) + + // restore the script and recover activity pool + _ = os.Rename("worker.bak", "worker.php") + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD", result) +} diff --git a/tests/plugins/temporal/hp_test.go b/tests/plugins/temporal/hp_test.go new file mode 100644 index 00000000..bceac025 --- /dev/null +++ b/tests/plugins/temporal/hp_test.go @@ -0,0 +1,408 @@ +package tests + +import ( + "context" + "crypto/rand" + "crypto/sha512" + "fmt" + "testing" + "time" + + "go.temporal.io/api/common/v1" + + "github.com/fatih/color" + "github.com/stretchr/testify/assert" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func init() { + color.NoColor = false +} + +func Test_VerifyRegistration(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + assert.Contains(t, s.workflows.WorkflowNames(), "SimpleWorkflow") + + assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.echo") + assert.Contains(t, s.activities.ActivityNames(), "HeartBeatActivity.doSomething") + + assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.lower") +} + +func Test_ExecuteSimpleWorkflow_1(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD", result) +} + +type User struct { + Name string + Email string +} + +func Test_ExecuteSimpleDTOWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleDTOWorkflow", + User{ + Name: "Antony", + Email: "[email protected]", + }, + ) + assert.NoError(t, err) + + var result struct{ Message string } + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "Hello Antony <[email protected]>", result.Message) +} + +func Test_ExecuteSimpleWorkflowWithSequenceInBatch(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WorkflowWithSequence", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) +} + +func Test_MultipleWorkflowsInSingleWorker(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + w2, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD", result) + + assert.NoError(t, w2.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) +} + +func Test_ExecuteWorkflowWithParallelScopes(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ParallelScopesWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "HELLO WORLD|Hello World|hello world", result) +} + +func Test_Timer(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + start := time.Now() + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "TimerWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "hello world", result) + assert.True(t, time.Since(start).Seconds() > 1) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_TIMER_STARTED + }) +} + +func Test_SideEffect(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SideEffectWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Contains(t, result, "hello world-") + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + return event.EventType == enums.EVENT_TYPE_MARKER_RECORDED + }) +} + +func Test_EmptyWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "EmptyWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 42, result) +} + +func Test_PromiseChaining(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ChainedWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "result:hello world", result) +} + +func Test_ActivityHeartbeat(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleHeartbeatWorkflow", + 2, + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + assert.Len(t, we.PendingActivities, 1) + + act := we.PendingActivities[0] + + assert.Equal(t, `{"value":2}`, string(act.HeartbeatDetails.Payloads[0].Data)) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK", result) +} + +func Test_FailedActivityHeartbeat(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "FailedHeartbeatWorkflow", + 8, + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + assert.Len(t, we.PendingActivities, 1) + + act := we.PendingActivities[0] + + assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data)) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK!", result) +} + +func Test_BinaryPayload(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + rnd := make([]byte, 2500) + + _, err := rand.Read(rnd) + assert.NoError(t, err) + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "BinaryWorkflow", + rnd, + ) + assert.NoError(t, err) + + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + + assert.Equal(t, fmt.Sprintf("%x", sha512.Sum512(rnd)), result) +} + +func Test_ContinueAsNew(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ContinuableWorkflow", + 1, + ) + assert.NoError(t, err) + + time.Sleep(time.Second) + + we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) + assert.NoError(t, err) + + assert.Equal(t, "ContinuedAsNew", we.WorkflowExecutionInfo.Status.String()) + + time.Sleep(time.Second) + + // the result of the final workflow + var result string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "OK6", result) +} + +func Test_ActivityStubWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ActivityStubWorkflow", + "hello world", + ) + assert.NoError(t, err) + + // the result of the final workflow + var result []string + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, []string{ + "HELLO WORLD", + "invalid method call", + "UNTYPED", + }, result) +} + +func Test_ExecuteProtoWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "ProtoPayloadWorkflow", + ) + assert.NoError(t, err) + + var result common.WorkflowExecution + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, "updated", result.RunId) + assert.Equal(t, "workflow id", result.WorkflowId) +} + +func Test_SagaWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SagaWorkflow", + ) + assert.NoError(t, err) + + var result string + assert.Error(t, w.Get(context.Background(), &result)) +} diff --git a/tests/plugins/temporal/query_test.go b/tests/plugins/temporal/query_test.go new file mode 100644 index 00000000..8b0caeee --- /dev/null +++ b/tests/plugins/temporal/query_test.go @@ -0,0 +1,66 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/client" +) + +func Test_ListQueries(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "QueryWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 500) + + v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "error", -1) + assert.Nil(t, v) + assert.Error(t, err) + + assert.Contains(t, err.Error(), "KnownQueryTypes=[get]") + + var r int + assert.NoError(t, w.Get(context.Background(), &r)) + assert.Equal(t, 0, r) +} + +func Test_GetQuery(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "QueryWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", 88) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 500) + + v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "get", nil) + assert.NoError(t, err) + + var r int + assert.NoError(t, v.Get(&r)) + assert.Equal(t, 88, r) + + assert.NoError(t, w.Get(context.Background(), &r)) + assert.Equal(t, 88, r) +} diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go new file mode 100644 index 00000000..c8d815c3 --- /dev/null +++ b/tests/plugins/temporal/server_test.go @@ -0,0 +1,198 @@ +package tests + +import ( + "context" + "testing" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/activity" + rrClient "github.com/spiral/roadrunner/v2/plugins/temporal/client" + "github.com/spiral/roadrunner/v2/plugins/temporal/workflow" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + temporalClient "go.temporal.io/sdk/client" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type TestServer struct { + container endure.Container + temporal rrClient.Temporal + activities *activity.Plugin + workflows *workflow.Plugin +} + +type ConfigOption struct { + Name string + Value interface{} +} + +func NewOption(name string, value interface{}) ConfigOption { + return ConfigOption{Name: name, Value: value} +} + +func NewTestServer(opt ...ConfigOption) *TestServer { + e, err := endure.NewContainer(initLogger(), endure.RetryOnFail(false)) + if err != nil { + panic(err) + } + + t := &rrClient.Plugin{} + a := &activity.Plugin{} + w := &workflow.Plugin{} + + if err := e.Register(initConfig(opt...)); err != nil { + panic(err) + } + + if err := e.Register(&logger.ZapLogger{}); err != nil { + panic(err) + } + if err := e.Register(&resetter.Plugin{}); err != nil { + panic(err) + } + if err := e.Register(&informer.Plugin{}); err != nil { + panic(err) + } + if err := e.Register(&server.Plugin{}); err != nil { + panic(err) + } + if err := e.Register(&rpc.Plugin{}); err != nil { + panic(err) + } + + if err := e.Register(t); err != nil { + panic(err) + } + if err := e.Register(a); err != nil { + panic(err) + } + if err := e.Register(w); err != nil { + panic(err) + } + + if err := e.Init(); err != nil { + panic(err) + } + + errCh, err := e.Serve() + if err != nil { + panic(err) + } + + go func() { + err := <-errCh + er := e.Stop() + if er != nil { + panic(err) + } + }() + + return &TestServer{container: e, temporal: t, activities: a, workflows: w} +} + +func (s *TestServer) Client() temporalClient.Client { + return s.temporal.GetClient() +} + +func (s *TestServer) MustClose() { + err := s.container.Stop() + if err != nil { + panic(err) + } +} + +func initConfig(opt ...ConfigOption) config.Configurer { + cfg := &config.Viper{} + cfg.Path = ".rr.yaml" + cfg.Prefix = "rr" + + return cfg +} + +func initLogger() *zap.Logger { + cfg := zap.Config{ + Level: zap.NewAtomicLevelAt(zap.ErrorLevel), + Encoding: "console", + EncoderConfig: zapcore.EncoderConfig{ + MessageKey: "message", + LevelKey: "level", + TimeKey: "time", + CallerKey: "caller", + NameKey: "name", + StacktraceKey: "stack", + EncodeLevel: zapcore.CapitalLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + + l, err := cfg.Build(zap.AddCaller()) + if err != nil { + panic(err) + } + + return l +} + +func (s *TestServer) AssertContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { + i := s.Client().GetWorkflowHistory( + context.Background(), + w.GetID(), + w.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + + for { + if !i.HasNext() { + t.Error("no more events and no match found") + break + } + + e, err := i.Next() + if err != nil { + t.Error("unable to read history event") + break + } + + if assert(e) { + break + } + } +} + +func (s *TestServer) AssertNotContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { + i := s.Client().GetWorkflowHistory( + context.Background(), + w.GetID(), + w.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + + for { + if !i.HasNext() { + break + } + + e, err := i.Next() + if err != nil { + t.Error("unable to read history event") + break + } + + if assert(e) { + t.Error("found unexpected event") + break + } + } +} diff --git a/tests/plugins/temporal/signal_test.go b/tests/plugins/temporal/signal_test.go new file mode 100644 index 00000000..51826287 --- /dev/null +++ b/tests/plugins/temporal/signal_test.go @@ -0,0 +1,170 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/assert" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func Test_SignalsWithoutSignals(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflow", + "Hello World", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 0, result) +} + +func Test_SendSignalDuringTimer(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().SignalWithStartWorkflow( + context.Background(), + "signalled-"+uuid.New(), + "add", + 10, + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflow", + ) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, 9, result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) + return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" + } + + return false + }) +} + +func Test_SendSignalBeforeCompletingWorkflow(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "SimpleSignalledWorkflowWithSleep", + ) + assert.NoError(t, err) + + // should be around sleep(1) call + time.Sleep(time.Second + time.Millisecond*200) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, -1, result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) + return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" + } + + return false + }) +} + +func Test_RuntimeSignal(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().SignalWithStartWorkflow( + context.Background(), + "signalled-"+uuid.New(), + "add", + -1, + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "RuntimeSignalWorkflow", + ) + assert.NoError(t, err) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + assert.Equal(t, -1, result) + + s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { + if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) + return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" + } + + return false + }) +} + +func Test_SignalSteps(t *testing.T) { + s := NewTestServer() + defer s.MustClose() + + w, err := s.Client().ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "default", + }, + "WorkflowWithSignalledSteps", + ) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "begin", true) + assert.NoError(t, err) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next1", true) + assert.NoError(t, err) + + v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) + assert.NoError(t, err) + + var r int + assert.NoError(t, v.Get(&r)) + assert.Equal(t, 2, r) + + err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next2", true) + assert.NoError(t, err) + + v, err = s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) + assert.NoError(t, err) + + assert.NoError(t, v.Get(&r)) + assert.Equal(t, 3, r) + + var result int + assert.NoError(t, w.Get(context.Background(), &result)) + + // 3 ticks + assert.Equal(t, 3, result) +} diff --git a/tests/plugins/temporal/worker.php b/tests/plugins/temporal/worker.php new file mode 100644 index 00000000..0d0263e7 --- /dev/null +++ b/tests/plugins/temporal/worker.php @@ -0,0 +1,33 @@ +<?php + +declare(strict_types=1); + +require __DIR__ . '/../../vendor/autoload.php'; + +/** + * @param string $dir + * @return array<string> + */ +$getClasses = static function (string $dir): iterable { + $files = glob($dir . '/*.php'); + + foreach ($files as $file) { + yield substr(basename($file), 0, -4); + } +}; + +$factory = \Temporal\WorkerFactory::create(); + +$worker = $factory->newWorker('default'); + +// register all workflows +foreach ($getClasses(__DIR__ . '/../../temporal/Workflow') as $name) { + $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name); +} + +// register all activity +foreach ($getClasses(__DIR__ . '/../../temporal/Activity') as $name) { + $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name); +} + +$factory->run(); diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php index d382098a..ef741a61 100644 --- a/tests/psr-worker-bench.php +++ b/tests/psr-worker-bench.php @@ -1,24 +1,58 @@ <?php +declare(strict_types=1); + use Spiral\RoadRunner; use Nyholm\Psr7\Factory; ini_set('display_errors', 'stderr'); include "vendor/autoload.php"; -$worker = new RoadRunner\Http\PSR7Worker( - RoadRunner\Worker::create(), - new Factory\Psr17Factory(), - new Factory\Psr17Factory(), - new Factory\Psr17Factory() -); - -while ($req = $worker->waitRequest()) { - try { - $rsp = new \Nyholm\Psr7\Response(); - $rsp->getBody()->write("hello world"); - $worker->respond($rsp); - } catch (\Throwable $e) { - $worker->getWorker()->error((string)$e); +$env = \Spiral\RoadRunner\Environment::fromGlobals(); + +if ($env->getMode() === 'http') { + $worker = new RoadRunner\Http\PSR7Worker( + RoadRunner\Worker::create(), + new Factory\Psr17Factory(), + new Factory\Psr17Factory(), + new Factory\Psr17Factory() + ); + + while ($req = $worker->waitRequest()) { + try { + $rsp = new \Nyholm\Psr7\Response(); + $rsp->getBody()->write("hello world"); + $worker->respond($rsp); + } catch (\Throwable $e) { + $worker->getWorker()->error((string)$e); + } } +} else { + /** + * @param string $dir + * @return array<string> + */ + $getClasses = static function (string $dir): iterable { + $files = glob($dir . '/*.php'); + + foreach ($files as $file) { + yield substr(basename($file), 0, -4); + } + }; + + $factory = \Temporal\WorkerFactory::create(); + + $worker = $factory->newWorker('default'); + + // register all workflows + foreach ($getClasses(__DIR__ . '/../temporal/Workflow') as $name) { + $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name); + } + + // register all activity + foreach ($getClasses(__DIR__ . '/../temporal/Activity') as $name) { + $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name); + } + + $factory->run(); }
\ No newline at end of file diff --git a/tests/temporal/Activity/HeartBeatActivity.php b/tests/temporal/Activity/HeartBeatActivity.php new file mode 100644 index 00000000..acf4a451 --- /dev/null +++ b/tests/temporal/Activity/HeartBeatActivity.php @@ -0,0 +1,58 @@ +<?php + +namespace Temporal\Tests\Activity; + +use Temporal\Activity; +use Temporal\Activity\ActivityInterface; +use Temporal\Activity\ActivityMethod; +use Temporal\Roadrunner\Internal\Error; + +#[ActivityInterface(prefix: "HeartBeatActivity.")] +class HeartBeatActivity +{ + #[ActivityMethod] + public function doSomething( + int $value + ): string { + Activity::heartbeat(['value' => $value]); + sleep($value); + return 'OK'; + } + + #[ActivityMethod] + public function slow( + string $value + ): string { + for ($i = 0; $i < 5; $i++) { + Activity::heartbeat(['value' => $i]); + sleep(1); + } + + return 'OK'; + } + + #[ActivityMethod] + public function something( + string $value + ): string { + Activity::heartbeat(['value' => $value]); + sleep($value); + return 'OK'; + } + + #[ActivityMethod] + public function failedActivity( + int $value + ): string { + Activity::heartbeat(['value' => $value]); + if (Activity::getInfo()->attempt === 1) { + throw new \Error("failed"); + } + + if (!is_array(Activity::getHeartbeatDetails())) { + throw new \Error("no heartbeat details"); + } + + return 'OK!'; + } +}
\ No newline at end of file diff --git a/tests/temporal/Activity/SimpleActivity.php b/tests/temporal/Activity/SimpleActivity.php new file mode 100644 index 00000000..576b126e --- /dev/null +++ b/tests/temporal/Activity/SimpleActivity.php @@ -0,0 +1,63 @@ +<?php + +namespace Temporal\Tests\Activity; + +use Temporal\Activity\ActivityInterface; +use Temporal\Activity\ActivityMethod; +use Temporal\Api\Common\V1\WorkflowExecution; +use Temporal\DataConverter\Bytes; +use Temporal\Tests\DTO\Message; +use Temporal\Tests\DTO\User; + +#[ActivityInterface(prefix: "SimpleActivity.")] +class SimpleActivity +{ + #[ActivityMethod] + public function echo( + string $input + ): string { + return strtoupper($input); + } + + #[ActivityMethod] + public function lower( + string $input + ): string { + return strtolower($input); + } + + #[ActivityMethod] + public function greet( + User $user + ): Message { + return new Message(sprintf("Hello %s <%s>", $user->name, $user->email)); + } + + #[ActivityMethod] + public function slow( + string $input + ): string { + sleep(2); + + return strtolower($input); + } + + #[ActivityMethod] + public function sha512( + Bytes $input + ): string { + return hash("sha512", ($input->getData())); + } + + public function updateRunID(WorkflowExecution $e): WorkflowExecution + { + $e->setRunId('updated'); + return $e; + } + + #[ActivityMethod] + public function fail() + { + throw new \Error("failed activity"); + } +}
\ No newline at end of file diff --git a/tests/temporal/Client/StartNewWorkflow.php b/tests/temporal/Client/StartNewWorkflow.php new file mode 100644 index 00000000..67bc1d01 --- /dev/null +++ b/tests/temporal/Client/StartNewWorkflow.php @@ -0,0 +1,23 @@ +<?php + + +namespace Temporal\Tests\Client; + +use Temporal\Client; +use Temporal\Tests\Workflow\SimpleDTOWorkflow; + +use function Symfony\Component\String\s; + +class StartNewWorkflow +{ + private $stub; + + public function __construct(Client\ClientInterface $client) + { + $this->stub = $client->newWorkflowStub(SimpleDTOWorkflow::class); + } + + public function __invoke() + { + } +} diff --git a/tests/temporal/DTO/Message.php b/tests/temporal/DTO/Message.php new file mode 100644 index 00000000..61703fe8 --- /dev/null +++ b/tests/temporal/DTO/Message.php @@ -0,0 +1,14 @@ +<?php + + +namespace Temporal\Tests\DTO; + +class Message +{ + public string $message; + + public function __construct(string $message) + { + $this->message = $message; + } +}
\ No newline at end of file diff --git a/tests/temporal/DTO/User.php b/tests/temporal/DTO/User.php new file mode 100644 index 00000000..cefea137 --- /dev/null +++ b/tests/temporal/DTO/User.php @@ -0,0 +1,15 @@ +<?php + + +namespace Temporal\Tests\DTO; + +use Temporal\Internal\Marshaller\Meta\Marshal; + +class User +{ + #[Marshal(name: "Name")] + public string $name; + + #[Marshal(name: "Email")] + public string $email; +}
\ No newline at end of file diff --git a/tests/temporal/Workflow/ActivityStubWorkflow.php b/tests/temporal/Workflow/ActivityStubWorkflow.php new file mode 100644 index 00000000..58dcdafb --- /dev/null +++ b/tests/temporal/Workflow/ActivityStubWorkflow.php @@ -0,0 +1,39 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ActivityStubWorkflow +{ + #[WorkflowMethod(name: 'ActivityStubWorkflow')] + public function handler( + string $input + ) { + // typed stub + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $result = []; + $result[] = yield $simple->echo($input); + + try { + $simple->undefined($input); + } catch (\BadMethodCallException $e) { + $result[] = 'invalid method call'; + } + + // untyped stub + $untyped = Workflow::newUntypedActivityStub(ActivityOptions::new()->withStartToCloseTimeout(1)); + + $result[] = yield $untyped->execute('SimpleActivity.echo', ['untyped']); + + return $result; + } +} diff --git a/tests/temporal/Workflow/AggregatedWorkflow.php b/tests/temporal/Workflow/AggregatedWorkflow.php new file mode 100644 index 00000000..3299179e --- /dev/null +++ b/tests/temporal/Workflow/AggregatedWorkflow.php @@ -0,0 +1,30 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\SignalMethod; +use Temporal\Workflow\WorkflowInterface; +use Temporal\Workflow\WorkflowMethod; + +#[WorkflowInterface] +class AggregatedWorkflow +{ + private array $values = []; + + #[SignalMethod] + public function addValue( + string $value + ) { + $this->values[] = $value; + } + + #[WorkflowMethod(name: 'AggregatedWorkflow')] + public function run( + int $count + ) { + yield Workflow::await(fn() => count($this->values) === $count); + + return $this->values; + } +} diff --git a/tests/temporal/Workflow/AsyncActivityWorkflow.php b/tests/temporal/Workflow/AsyncActivityWorkflow.php new file mode 100644 index 00000000..79e45dfb --- /dev/null +++ b/tests/temporal/Workflow/AsyncActivityWorkflow.php @@ -0,0 +1,28 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityCancellationType; +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class AsyncActivityWorkflow +{ + #[WorkflowMethod(name: 'AsyncActivityWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(20) + ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED) + ); + + return yield $simple->external(); + } +} diff --git a/tests/temporal/Workflow/BinaryWorkflow.php b/tests/temporal/Workflow/BinaryWorkflow.php new file mode 100644 index 00000000..ed1952ad --- /dev/null +++ b/tests/temporal/Workflow/BinaryWorkflow.php @@ -0,0 +1,21 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\DataConverter\Bytes; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class BinaryWorkflow +{ + #[WorkflowMethod(name: 'BinaryWorkflow')] + public function handler( + Bytes $input + ): iterable { + $opts = ActivityOptions::new()->withStartToCloseTimeout(5); + + return yield Workflow::executeActivity('SimpleActivity.sha512', [$input], $opts); + } +} diff --git a/tests/temporal/Workflow/CancelSignalledChildWorkflow.php b/tests/temporal/Workflow/CancelSignalledChildWorkflow.php new file mode 100644 index 00000000..e2e43efa --- /dev/null +++ b/tests/temporal/Workflow/CancelSignalledChildWorkflow.php @@ -0,0 +1,57 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use React\Promise\Deferred; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelSignalledChildWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelSignalledChildWorkflow')] + public function handler() + { + // typed stub + $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class); + + $waitSignalled = new Deferred(); + + $this->status[] = 'start'; + + // start execution + $scope = Workflow::newCancellationScope( + function () use ($simple, $waitSignalled) { + $call = $simple->handler(); + $this->status[] = 'child started'; + + yield $simple->add(8); + $this->status[] = 'child signalled'; + $waitSignalled->resolve(); + + return yield $call; + } + ); + + // only cancel scope when signal dispatched + yield $waitSignalled; + $scope->cancel(); + $this->status[] = 'scope cancelled'; + + try { + return yield $scope; + } catch (\Throwable $e) { + $this->status[] = 'process done'; + + return 'cancelled ok'; + } + } +} diff --git a/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php b/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php new file mode 100644 index 00000000..6b463192 --- /dev/null +++ b/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php @@ -0,0 +1,29 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityCancellationType; +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\HeartBeatActivity; + +#[Workflow\WorkflowInterface] +class CanceledHeartbeatWorkflow +{ + #[WorkflowMethod(name: 'CanceledHeartbeatWorkflow')] + public function handler(): iterable + { + $act = Workflow::newActivityStub( + HeartBeatActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(50) + ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED) + ->withHeartbeatTimeout(1) + ); + + return yield $act->slow('test'); + } +} diff --git a/tests/temporal/Workflow/CancelledMidflightWorkflow.php b/tests/temporal/Workflow/CancelledMidflightWorkflow.php new file mode 100644 index 00000000..ea799ce1 --- /dev/null +++ b/tests/temporal/Workflow/CancelledMidflightWorkflow.php @@ -0,0 +1,47 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelledMidflightWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelledMidflightWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $this->status[] = 'start'; + + $scope = Workflow::newCancellationScope( + function () use ($simple) { + $this->status[] = 'in scope'; + $simple->slow('1'); + } + )->onCancel( + function () { + $this->status[] = 'on cancel'; + } + ); + + $scope->cancel(); + $this->status[] = 'done cancel'; + + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/CancelledNestedWorkflow.php b/tests/temporal/Workflow/CancelledNestedWorkflow.php new file mode 100644 index 00000000..0c82f761 --- /dev/null +++ b/tests/temporal/Workflow/CancelledNestedWorkflow.php @@ -0,0 +1,72 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Exception\Failure\CanceledFailure; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelledNestedWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelledNestedWorkflow')] + public function handler() + { + $this->status[] = 'begin'; + try { + yield Workflow::newCancellationScope( + function () { + $this->status[] = 'first scope'; + + $scope = Workflow::newCancellationScope( + function () { + $this->status[] = 'second scope'; + + try { + yield Workflow::timer(2); + } catch (CanceledFailure $e) { + $this->status[] = 'second scope cancelled'; + throw $e; + } + + $this->status[] = 'second scope done'; + } + )->onCancel( + function () { + $this->status[] = 'close second scope'; + } + ); + + try { + yield Workflow::timer(1); + } catch (CanceledFailure $e) { + $this->status[] = 'first scope cancelled'; + throw $e; + } + + $this->status[] = 'first scope done'; + + yield $scope; + } + )->onCancel( + function () { + $this->status[] = 'close first scope'; + } + ); + } catch (CanceledFailure $e) { + $this->status[] = 'close process'; + + return 'CANCELLED'; + } + + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/CancelledScopeWorkflow.php b/tests/temporal/Workflow/CancelledScopeWorkflow.php new file mode 100644 index 00000000..50e0992f --- /dev/null +++ b/tests/temporal/Workflow/CancelledScopeWorkflow.php @@ -0,0 +1,39 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class CancelledScopeWorkflow +{ + #[WorkflowMethod(name: 'CancelledScopeWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $cancelled = 'not'; + + $scope = Workflow::newCancellationScope( + function () use ($simple) { + yield Workflow::timer(2); + yield $simple->slow('hello'); + } + )->onCancel( + function () use (&$cancelled) { + $cancelled = 'yes'; + } + ); + + yield Workflow::timer(1); + $scope->cancel(); + + return $cancelled; + } +} diff --git a/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php b/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php new file mode 100644 index 00000000..5fe8d3d8 --- /dev/null +++ b/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php @@ -0,0 +1,55 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Exception\Failure\CanceledFailure; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class CancelledSingleScopeWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelledSingleScopeWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(5) + ); + + $this->status[] = 'start'; + try { + yield Workflow::newCancellationScope( + function () use ($simple) { + try { + $this->status[] = 'in scope'; + yield $simple->slow('1'); + } catch (CanceledFailure $e) { + // after process is complete, do not use for business logic + $this->status[] = 'captured in scope'; + throw $e; + } + } + )->onCancel( + function () { + $this->status[] = 'on cancel'; + } + ); + } catch (CanceledFailure $e) { + $this->status[] = 'captured in process'; + } + + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php b/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php new file mode 100644 index 00000000..2074aac1 --- /dev/null +++ b/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php @@ -0,0 +1,79 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Exception\Failure\CanceledFailure; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelledWithCompensationWorkflow +{ + private array $status = []; + + #[Workflow\QueryMethod(name: 'getStatus')] + public function getStatus(): array + { + return $this->status; + } + + #[WorkflowMethod(name: 'CancelledWithCompensationWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + // waits for 2 seconds + $slow = $simple->slow('DOING SLOW ACTIVITY'); + + try { + $this->status[] = 'yield'; + $result = yield $slow; + } catch (CanceledFailure $e) { + $this->status[] = 'rollback'; + + try { + // must fail again + $result = yield $slow; + } catch (CanceledFailure $e) { + $this->status[] = 'captured retry'; + } + + try { + // fail since on cancelled context + $result = yield $simple->echo('echo must fail'); + } catch (CanceledFailure $e) { + $this->status[] = 'captured promise on cancelled'; + } + + $scope = Workflow::newDetachedCancellationScope( + function () use ($simple) { + $this->status[] = 'START rollback'; + + $second = yield $simple->echo('rollback'); + + $this->status[] = sprintf("RESULT (%s)", $second); + + if ($second !== 'ROLLBACK') { + $this->status[] = 'FAIL rollback'; + return 'failed to compensate ' . $second; + } + $this->status[] = 'DONE rollback'; + + return 'OK'; + } + ); + + $this->status[] = 'WAIT ROLLBACK'; + $result = yield $scope; + $this->status[] = 'COMPLETE rollback'; + } + + $this->status[] = 'result: ' . $result; + return $result; + } +} diff --git a/tests/temporal/Workflow/CancelledWorkflow.php b/tests/temporal/Workflow/CancelledWorkflow.php new file mode 100644 index 00000000..be9f7542 --- /dev/null +++ b/tests/temporal/Workflow/CancelledWorkflow.php @@ -0,0 +1,31 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Exception\Failure\CanceledFailure; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class CancelledWorkflow +{ + #[WorkflowMethod(name: 'CancelledWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + // waits for 2 seconds + $slow = $simple->slow('DOING SLOW ACTIVITY'); + + try { + return yield $slow; + } catch (CanceledFailure $e) { + return "CANCELLED"; + } + } +} diff --git a/tests/temporal/Workflow/ChainedWorkflow.php b/tests/temporal/Workflow/ChainedWorkflow.php new file mode 100644 index 00000000..ba9c8f96 --- /dev/null +++ b/tests/temporal/Workflow/ChainedWorkflow.php @@ -0,0 +1,31 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ChainedWorkflow +{ + #[WorkflowMethod(name: 'ChainedWorkflow')] + public function handler(string $input): iterable + { + $opts = ActivityOptions::new()->withStartToCloseTimeout(5); + + return yield Workflow::executeActivity( + 'SimpleActivity.echo', + [$input], + $opts + )->then(function ($result) use ($opts) { + return Workflow::executeActivity( + 'SimpleActivity.lower', + ['Result:' . $result], + $opts + ); + }); + } +} diff --git a/tests/temporal/Workflow/ChildStubWorkflow.php b/tests/temporal/Workflow/ChildStubWorkflow.php new file mode 100644 index 00000000..608962c2 --- /dev/null +++ b/tests/temporal/Workflow/ChildStubWorkflow.php @@ -0,0 +1,30 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ChildStubWorkflow +{ + #[WorkflowMethod(name: 'ChildStubWorkflow')] + public function handler( + string $input + ) { + // typed stub + $simple = Workflow::newChildWorkflowStub(SimpleWorkflow::class); + + $result = []; + $result[] = yield $simple->handler($input); + + // untyped + $untyped = Workflow::newUntypedChildWorkflowStub('SimpleWorkflow'); + $result[] = yield $untyped->execute(['untyped']); + + $execution = yield $untyped->getExecution(); + assert($execution instanceof Workflow\WorkflowExecution); + + return $result; + } +} diff --git a/tests/temporal/Workflow/ComplexExceptionalWorkflow.php b/tests/temporal/Workflow/ComplexExceptionalWorkflow.php new file mode 100644 index 00000000..bf65ccb2 --- /dev/null +++ b/tests/temporal/Workflow/ComplexExceptionalWorkflow.php @@ -0,0 +1,26 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ComplexExceptionalWorkflow +{ + #[WorkflowMethod(name: 'ComplexExceptionalWorkflow')] + public function handler() + { + $child = Workflow::newChildWorkflowStub( + ExceptionalActivityWorkflow::class, + Workflow\ChildWorkflowOptions::new()->withRetryOptions( + (new RetryOptions())->withMaximumAttempts(1) + ) + ); + + return yield $child->handler(); + } +} diff --git a/tests/temporal/Workflow/ContinuableWorkflow.php b/tests/temporal/Workflow/ContinuableWorkflow.php new file mode 100644 index 00000000..78411414 --- /dev/null +++ b/tests/temporal/Workflow/ContinuableWorkflow.php @@ -0,0 +1,38 @@ +<?php + + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class ContinuableWorkflow +{ + #[WorkflowMethod(name: 'ContinuableWorkflow')] + public function handler( + int $generation + ) { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + if ($generation > 5) { + // complete + return "OK" . $generation; + } + + if ($generation !== 1) { + assert(!empty(Workflow::getInfo()->continuedExecutionRunId)); + } + + for ($i = 0; $i < $generation; $i++) { + yield $simple->echo((string)$generation); + } + + return Workflow::continueAsNew('ContinuableWorkflow', [++$generation]); + } +} diff --git a/tests/temporal/Workflow/EmptyWorkflow.php b/tests/temporal/Workflow/EmptyWorkflow.php new file mode 100644 index 00000000..57fb5e65 --- /dev/null +++ b/tests/temporal/Workflow/EmptyWorkflow.php @@ -0,0 +1,16 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow\WorkflowMethod; +use Temporal\Workflow; + +#[Workflow\WorkflowInterface] +class EmptyWorkflow +{ + #[WorkflowMethod] + public function handler() + { + return 42; + } +} diff --git a/tests/temporal/Workflow/ExceptionalActivityWorkflow.php b/tests/temporal/Workflow/ExceptionalActivityWorkflow.php new file mode 100644 index 00000000..e0ed0005 --- /dev/null +++ b/tests/temporal/Workflow/ExceptionalActivityWorkflow.php @@ -0,0 +1,25 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ExceptionalActivityWorkflow +{ + #[WorkflowMethod(name: 'ExceptionalActivityWorkflow')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ->withRetryOptions((new RetryOptions())->withMaximumAttempts(1)) + ); + + return yield $simple->fail(); + } +} diff --git a/tests/temporal/Workflow/ExceptionalWorkflow.php b/tests/temporal/Workflow/ExceptionalWorkflow.php new file mode 100644 index 00000000..9a3e907f --- /dev/null +++ b/tests/temporal/Workflow/ExceptionalWorkflow.php @@ -0,0 +1,18 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ExceptionalWorkflow +{ + #[WorkflowMethod(name: 'ExceptionalWorkflow')] + public function handler() + { + throw new \RuntimeException("workflow error"); + } +} diff --git a/tests/temporal/Workflow/FailedHeartbeatWorkflow.php b/tests/temporal/Workflow/FailedHeartbeatWorkflow.php new file mode 100644 index 00000000..e857f100 --- /dev/null +++ b/tests/temporal/Workflow/FailedHeartbeatWorkflow.php @@ -0,0 +1,30 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\HeartBeatActivity; + +#[Workflow\WorkflowInterface] +class FailedHeartbeatWorkflow +{ + #[WorkflowMethod(name: 'FailedHeartbeatWorkflow')] + public function handler( + int $iterations + ): iterable { + $act = Workflow::newActivityStub( + HeartBeatActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(50) + // will fail on first attempt + ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(2)) + ); + + return yield $act->failedActivity($iterations); + } +} diff --git a/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php b/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php new file mode 100644 index 00000000..c389fd78 --- /dev/null +++ b/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php @@ -0,0 +1,55 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\SignalMethod; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class LoopWithSignalCoroutinesWorkflow +{ + private array $values = []; + private array $result = []; + private $simple; + + public function __construct() + { + $this->simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + } + + #[SignalMethod] + public function addValue( + string $value + ) { + $value = yield $this->simple->prefix('in signal ', $value); + $value = yield $this->simple->prefix('in signal 2 ', $value); + + $this->values[] = $value; + } + + #[WorkflowMethod(name: 'LoopWithSignalCoroutinesWorkflow')] + public function run( + int $count + ) { + while (true) { + yield Workflow::await(fn() => $this->values !== []); + $value = array_shift($this->values); + + // uppercases + $this->result[] = yield $this->simple->echo($value); + + if (count($this->result) === $count) { + break; + } + } + + asort($this->result); + return array_values($this->result); + } +} diff --git a/tests/temporal/Workflow/LoopWorkflow.php b/tests/temporal/Workflow/LoopWorkflow.php new file mode 100644 index 00000000..97d7a3aa --- /dev/null +++ b/tests/temporal/Workflow/LoopWorkflow.php @@ -0,0 +1,51 @@ +<?php + + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\SignalMethod; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class LoopWorkflow +{ + private array $values = []; + private array $result = []; + private $simple; + + public function __construct() + { + $this->simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + } + + #[SignalMethod] + public function addValue( + string $value + ) { + $this->values[] = $value; + } + + #[WorkflowMethod(name: 'LoopWorkflow')] + public function run( + int $count + ) { + while (true) { + yield Workflow::await(fn() => $this->values !== []); + $value = array_shift($this->values); + + $this->result[] = yield $this->simple->echo($value); + + if (count($this->result) === $count) { + break; + } + } + + return $this->result; + } +} diff --git a/tests/temporal/Workflow/ParallelScopesWorkflow.php b/tests/temporal/Workflow/ParallelScopesWorkflow.php new file mode 100644 index 00000000..8a2303f4 --- /dev/null +++ b/tests/temporal/Workflow/ParallelScopesWorkflow.php @@ -0,0 +1,36 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Promise; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class ParallelScopesWorkflow +{ + #[WorkflowMethod(name: 'ParallelScopesWorkflow')] + public function handler(string $input) + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $a = Workflow::newCancellationScope(function () use ($simple, $input) { + return yield $simple->echo($input); + }); + + $b = Workflow::newCancellationScope(function () use ($simple, $input) { + return yield $simple->lower($input); + }); + + [$ra, $rb] = yield Promise::all([$a, $b]); + + return sprintf('%s|%s|%s', $ra, $input, $rb); + } +} diff --git a/tests/temporal/Workflow/PeriodicWorkflow.php b/tests/temporal/Workflow/PeriodicWorkflow.php new file mode 100644 index 00000000..08f5f2fa --- /dev/null +++ b/tests/temporal/Workflow/PeriodicWorkflow.php @@ -0,0 +1,19 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class PeriodicWorkflow +{ + #[WorkflowMethod(name: 'PeriodicWorkflow')] + public function handler() + { + error_log("GOT SOMETHING" . print_r(Workflow::getLastCompletionResult(), true)); + + // todo: get last completion result + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/ProtoPayloadWorkflow.php b/tests/temporal/Workflow/ProtoPayloadWorkflow.php new file mode 100644 index 00000000..7adbed1e --- /dev/null +++ b/tests/temporal/Workflow/ProtoPayloadWorkflow.php @@ -0,0 +1,33 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Api\Common\V1\WorkflowExecution; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class ProtoPayloadWorkflow +{ + #[WorkflowMethod(name: 'ProtoPayloadWorkflow')] + public function handler(): iterable + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $e = new WorkflowExecution(); + $e->setWorkflowId('workflow id'); + $e->setRunId('run id'); + + /** @var WorkflowExecution $e2 */ + $e2 = yield $simple->updateRunID($e); + assert($e2->getWorkflowId() === $e->getWorkflowId()); + assert($e2->getRunId() === 'updated'); + + return $e2; + } +} diff --git a/tests/temporal/Workflow/QueryWorkflow.php b/tests/temporal/Workflow/QueryWorkflow.php new file mode 100644 index 00000000..96e41582 --- /dev/null +++ b/tests/temporal/Workflow/QueryWorkflow.php @@ -0,0 +1,41 @@ +<?php + +/** + * This file is part of Temporal package. + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class QueryWorkflow +{ + private int $counter = 0; + + #[Workflow\SignalMethod(name: "add")] + public function add( + int $value + ) { + $this->counter += $value; + } + + #[Workflow\QueryMethod(name: "get")] + public function get(): int + { + return $this->counter; + } + + #[WorkflowMethod] + public function handler() + { + // collect signals during one second + yield Workflow::timer(1); + + return $this->counter; + } +} diff --git a/tests/temporal/Workflow/RuntimeSignalWorkflow.php b/tests/temporal/Workflow/RuntimeSignalWorkflow.php new file mode 100644 index 00000000..f700af72 --- /dev/null +++ b/tests/temporal/Workflow/RuntimeSignalWorkflow.php @@ -0,0 +1,31 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use React\Promise\Deferred; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class RuntimeSignalWorkflow +{ + #[WorkflowMethod] + public function handler() + { + $wait1 = new Deferred(); + $wait2 = new Deferred(); + + $counter = 0; + + Workflow::registerSignal('add', function ($value) use (&$counter, $wait1, $wait2) { + $counter += $value; + $wait1->resolve($value); + $wait2->resolve($value); + }); + + yield $wait1; + yield $wait2; + + return $counter; + } +} diff --git a/tests/temporal/Workflow/SagaWorkflow.php b/tests/temporal/Workflow/SagaWorkflow.php new file mode 100644 index 00000000..e47c0203 --- /dev/null +++ b/tests/temporal/Workflow/SagaWorkflow.php @@ -0,0 +1,54 @@ +<?php + +/** + * This file is part of Temporal package. + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; + +#[Workflow\WorkflowInterface] +class SagaWorkflow +{ + #[Workflow\WorkflowMethod(name: 'SagaWorkflow')] + public function run() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(60) + ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(1)) + ); + + $saga = new Workflow\Saga(); + $saga->setParallelCompensation(true); + + try { + yield $simple->echo('test'); + $saga->addCompensation( + function () use ($simple) { + yield $simple->echo('compensate echo'); + } + ); + + yield $simple->lower('TEST'); + $saga->addCompensation( + function () use ($simple) { + yield $simple->lower('COMPENSATE LOWER'); + } + ); + + yield $simple->fail(); + } catch (\Throwable $e) { + yield $saga->compensate(); + throw $e; + } + } +} diff --git a/tests/temporal/Workflow/SideEffectWorkflow.php b/tests/temporal/Workflow/SideEffectWorkflow.php new file mode 100644 index 00000000..95d396e4 --- /dev/null +++ b/tests/temporal/Workflow/SideEffectWorkflow.php @@ -0,0 +1,30 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\Uuid; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class SideEffectWorkflow +{ + #[WorkflowMethod(name: 'SideEffectWorkflow')] + public function handler(string $input): iterable + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $result = yield Workflow::sideEffect( + function () use ($input) { + return $input . '-42'; + } + ); + + return yield $simple->lower($result); + } +} diff --git a/tests/temporal/Workflow/SignalChildViaStubWorkflow.php b/tests/temporal/Workflow/SignalChildViaStubWorkflow.php new file mode 100644 index 00000000..828086fc --- /dev/null +++ b/tests/temporal/Workflow/SignalChildViaStubWorkflow.php @@ -0,0 +1,25 @@ +<?php + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class SignalChildViaStubWorkflow +{ + #[WorkflowMethod(name: 'SignalChildViaStubWorkflow')] + public function handler() + { + // typed stub + $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class); + + // start execution + $call = $simple->handler(); + + yield $simple->add(8); + + // expects 8 + return yield $call; + } +} diff --git a/tests/temporal/Workflow/SimpleDTOWorkflow.php b/tests/temporal/Workflow/SimpleDTOWorkflow.php new file mode 100644 index 00000000..bd39a0a0 --- /dev/null +++ b/tests/temporal/Workflow/SimpleDTOWorkflow.php @@ -0,0 +1,35 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Tests\DTO\Message; +use Temporal\Tests\DTO\User; + +#[Workflow\WorkflowInterface] +class SimpleDTOWorkflow +{ + #[WorkflowMethod(name: 'SimpleDTOWorkflow')]//, returnType: Message::class)] + public function handler( + User $user + ) { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(5) + ); + + $value = yield $simple->greet($user); + + if (!$value instanceof Message) { + return "FAIL"; + } + + return $value; + } +} diff --git a/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php b/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php new file mode 100644 index 00000000..c9999cd1 --- /dev/null +++ b/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php @@ -0,0 +1,25 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\HeartBeatActivity; + +#[Workflow\WorkflowInterface] +class SimpleHeartbeatWorkflow +{ + #[WorkflowMethod(name: 'SimpleHeartbeatWorkflow')] + public function handler(int $iterations): iterable + { + $act = Workflow::newActivityStub( + HeartBeatActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(50) + ); + + return yield $act->doSomething($iterations); + } +} diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflow.php b/tests/temporal/Workflow/SimpleSignalledWorkflow.php new file mode 100644 index 00000000..0df25a65 --- /dev/null +++ b/tests/temporal/Workflow/SimpleSignalledWorkflow.php @@ -0,0 +1,30 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class SimpleSignalledWorkflow +{ + private $counter = 0; + + #[Workflow\SignalMethod(name: "add")] + public function add( + int $value + ) { + $this->counter += $value; + } + + #[WorkflowMethod(name: 'SimpleSignalledWorkflow')] + public function handler(): iterable + { + // collect signals during one second + yield Workflow::timer(1); + + return $this->counter; + } +} diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php b/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php new file mode 100644 index 00000000..d10ba04a --- /dev/null +++ b/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php @@ -0,0 +1,34 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class SimpleSignalledWorkflowWithSleep +{ + private $counter = 0; + + #[Workflow\SignalMethod(name: "add")] + public function add( + int $value + ) { + $this->counter += $value; + } + + #[WorkflowMethod(name: 'SimpleSignalledWorkflowWithSleep')] + public function handler(): iterable + { + // collect signals during one second + yield Workflow::timer(1); + + if (!Workflow::isReplaying()) { + sleep(1); + } + + return $this->counter; + } +} diff --git a/tests/temporal/Workflow/SimpleWorkflow.php b/tests/temporal/Workflow/SimpleWorkflow.php new file mode 100644 index 00000000..36e12f69 --- /dev/null +++ b/tests/temporal/Workflow/SimpleWorkflow.php @@ -0,0 +1,31 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class SimpleWorkflow +{ + #[WorkflowMethod(name: 'SimpleWorkflow')] + public function handler( + string $input + ): iterable { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(5) + ->withRetryOptions( + RetryOptions::new()->withMaximumAttempts(2) + ) + ); + + return yield $simple->echo($input); + } +} diff --git a/tests/temporal/Workflow/TimerWorkflow.php b/tests/temporal/Workflow/TimerWorkflow.php new file mode 100644 index 00000000..ab60d6c9 --- /dev/null +++ b/tests/temporal/Workflow/TimerWorkflow.php @@ -0,0 +1,27 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class TimerWorkflow +{ + #[WorkflowMethod(name: 'TimerWorkflow')] + public function handler(string $input): iterable + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + yield Workflow::timer(1); + + return yield $simple->lower($input); + } +} diff --git a/tests/temporal/Workflow/WaitWorkflow.php b/tests/temporal/Workflow/WaitWorkflow.php new file mode 100644 index 00000000..826952c1 --- /dev/null +++ b/tests/temporal/Workflow/WaitWorkflow.php @@ -0,0 +1,33 @@ +<?php + + +namespace Temporal\Tests\Workflow; + + +use Temporal\Workflow; +use Temporal\Workflow\SignalMethod; +use Temporal\Workflow\WorkflowInterface; +use Temporal\Workflow\WorkflowMethod; + +#[WorkflowInterface] +class WaitWorkflow +{ + private bool $ready = false; + private string $value; + + #[SignalMethod] + public function unlock( + string $value + ) { + $this->ready = true; + $this->value = $value; + } + + #[WorkflowMethod(name: 'WaitWorkflow')] + public function run() + { + yield Workflow::await(fn() => $this->ready); + + return $this->value; + } +} diff --git a/tests/temporal/Workflow/WithChildStubWorkflow.php b/tests/temporal/Workflow/WithChildStubWorkflow.php new file mode 100644 index 00000000..cdebe3d8 --- /dev/null +++ b/tests/temporal/Workflow/WithChildStubWorkflow.php @@ -0,0 +1,20 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class WithChildStubWorkflow +{ + #[WorkflowMethod(name: 'WithChildStubWorkflow')] + public function handler(string $input): iterable + { + $child = Workflow::newChildWorkflowStub(SimpleWorkflow::class); + + return 'Child: ' . (yield $child->handler('child ' . $input)); + } +} diff --git a/tests/temporal/Workflow/WithChildWorkflow.php b/tests/temporal/Workflow/WithChildWorkflow.php new file mode 100644 index 00000000..aac0979b --- /dev/null +++ b/tests/temporal/Workflow/WithChildWorkflow.php @@ -0,0 +1,25 @@ +<?php + +declare(strict_types=1); + +namespace Temporal\Tests\Workflow; + +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; + +#[Workflow\WorkflowInterface] +class WithChildWorkflow +{ + #[WorkflowMethod(name: 'WithChildWorkflow')] + public function handler( + string $input + ): iterable { + $result = yield Workflow::executeChildWorkflow( + 'SimpleWorkflow', + ['child ' . $input], + Workflow\ChildWorkflowOptions::new() + ); + + return 'Child: ' . $result; + } +} diff --git a/tests/temporal/Workflow/WorkflowWithSequence.php b/tests/temporal/Workflow/WorkflowWithSequence.php new file mode 100644 index 00000000..9e813a9c --- /dev/null +++ b/tests/temporal/Workflow/WorkflowWithSequence.php @@ -0,0 +1,30 @@ +<?php + + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class WorkflowWithSequence +{ + #[WorkflowMethod(name: 'WorkflowWithSequence')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $a = $simple->echo('a'); + $b = $simple->echo('b'); + + yield $a; + yield $b; + + return 'OK'; + } +} diff --git a/tests/temporal/Workflow/WorkflowWithSignalledSteps.php b/tests/temporal/Workflow/WorkflowWithSignalledSteps.php new file mode 100644 index 00000000..5f1af766 --- /dev/null +++ b/tests/temporal/Workflow/WorkflowWithSignalledSteps.php @@ -0,0 +1,51 @@ +<?php + + +namespace Temporal\Tests\Workflow; + +use React\Promise\Deferred; +use React\Promise\PromiseInterface; +use Temporal\Activity\ActivityOptions; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowMethod; +use Temporal\Tests\Activity\SimpleActivity; + +#[Workflow\WorkflowInterface] +class WorkflowWithSignalledSteps +{ + #[WorkflowMethod(name: 'WorkflowWithSignalledSteps')] + public function handler() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new()->withStartToCloseTimeout(5) + ); + + $value = 0; + Workflow::registerQuery('value', function () use (&$value) { + return $value; + }); + + yield $this->promiseSignal('begin'); + $value++; + + yield $this->promiseSignal('next1'); + $value++; + + yield $this->promiseSignal('next2'); + $value++; + + return $value; + } + + // is this correct? + private function promiseSignal(string $name): PromiseInterface + { + $signal = new Deferred(); + Workflow::registerSignal($name, function ($value) use ($signal) { + $signal->resolve($value); + }); + + return $signal->promise(); + } +} |