diff options
author | Valery Piashchynski <[email protected]> | 2021-01-27 14:34:31 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-01-27 14:34:31 +0300 |
commit | 6dd131497808f414ac1cb952d4b0b89b9e0689f8 (patch) | |
tree | f7af7d7d494d1f5ca272af1ad0b978fe44d685a9 | |
parent | e2266b80db47444ba5858c736833a8a81b1361ad (diff) | |
parent | 744c2b0c86b88f77e681f8660bf3a476e83711b8 (diff) |
Merge pull request #507 from spiral/refactor/temporal-plugins
refactoring(temporal): Move temporal plugin to the https://github.com/temporalio/roadrunner-temporal repository
90 files changed, 9 insertions, 6504 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 076de74c..27df293a 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -69,9 +69,6 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal.out -covermode=atomic ./tests/plugins/temporal - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_protocol.out -covermode=atomic ./plugins/temporal/protocol - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_workflow.out -covermode=atomic ./plugins/temporal/workflow go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 00b51598..77f9cfda 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -67,8 +67,6 @@ jobs: go test -v -race -tags=debug ./pkg/pool go test -v -race -tags=debug ./pkg/worker go test -v -race -tags=debug ./pkg/worker_watcher - go test -v -race -tags=debug ./plugins/temporal/protocol - go test -v -race -tags=debug ./plugins/temporal/workflow go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./tests/plugins/http go test -v -race -tags=debug ./tests/plugins/informer diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml deleted file mode 100644 index 785f40ad..00000000 --- a/.github/workflows/release.yml +++ /dev/null @@ -1,139 +0,0 @@ -name: release - -on: - release: # Docs: <https://help.github.com/en/articles/events-that-trigger-workflows#release-event-release> - types: [published] - -jobs: - build: - name: Build for ${{ matrix.os }} (${{ matrix.arch }}, ${{ matrix.compiler }}) - runs-on: ubuntu-20.04 - strategy: - fail-fast: false - matrix: - os: [windows, freebsd, darwin] # linux, freebsd, darwin, windows - compiler: [gcc] # gcc, musl-gcc - archiver: [zip] # tar, zip - arch: [amd64] # amd64, 386 - include: - - os: linux - compiler: gcc - archiver: tar - arch: amd64 - - os: '' - compiler: musl-gcc # more info: <https://musl.libc.org/> - archiver: zip - arch: amd64 - steps: - - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: 1.15.6 - - - name: Check out code - uses: actions/checkout@v2 - - - name: Install musl - if: matrix.compiler == 'musl-gcc' - run: sudo apt-get install -y musl-tools - - - name: Download dependencies - run: go mod download # `-x` means "verbose" mode - - - name: Generate builder values - id: values - run: | - echo "::set-output name=version::`echo ${GITHUB_REF##*/} | sed -e 's/^[vV ]*//'`" - echo "::set-output name=timestamp::`date +%FT%T%z`" - echo "::set-output name=binary-name::rr`[ ${{ matrix.os }} = 'windows' ] && echo '.exe'`" - - - name: Compile binary file - env: - GOOS: ${{ matrix.os }} - GOARCH: ${{ matrix.arch }} - CC: ${{ matrix.compiler }} - CGO_ENABLED: 0 - LDFLAGS: >- - -s - -X github.com/spiral/roadrunner/cmd/cli.Version=${{ steps.values.outputs.version }} - -X github.com/spiral/roadrunner/cmd/cli.BuildTime=${{ steps.values.outputs.timestamp }} - run: | - go build -trimpath -ldflags "$LDFLAGS" -o "./${{ steps.values.outputs.binary-name }}" ./cmd/main.go - stat "./${{ steps.values.outputs.binary-name }}" - - - name: Generate distributive directory name - id: dist-dir - run: > - echo "::set-output name=name::roadrunner-${{ steps.values.outputs.version }}-$( - [ ${{ matrix.os }} != '' ] && echo '${{ matrix.os }}' || echo 'unknown' - )$( - [ ${{ matrix.compiler }} = 'musl-gcc' ] && echo '-musl' - )-${{ matrix.arch }}" - - - name: Generate distributive archive name - id: dist-arch - run: > - echo "::set-output name=name::${{ steps.dist-dir.outputs.name }}.$( - case ${{ matrix.archiver }} in - zip) echo 'zip';; - tar) echo 'tar.gz';; - *) exit 10; - esac - )" - - - name: Create distributive - run: | - mkdir ${{ steps.dist-dir.outputs.name }} - mv "./${{ steps.values.outputs.binary-name }}" ./${{ steps.dist-dir.outputs.name }}/ - cp ./README.md ./CHANGELOG.md ./LICENSE ./${{ steps.dist-dir.outputs.name }}/ - - - name: Pack distributive using tar - if: matrix.archiver == 'tar' - run: tar -zcf "${{ steps.dist-arch.outputs.name }}" "${{ steps.dist-dir.outputs.name }}" - - - name: Pack distributive using zip - if: matrix.archiver == 'zip' - run: zip -r -q "${{ steps.dist-arch.outputs.name }}" "${{ steps.dist-dir.outputs.name }}" - - - name: Upload artifact - uses: actions/upload-artifact@v2 - with: - name: ${{ steps.dist-dir.outputs.name }} - path: ${{ steps.dist-arch.outputs.name }} - if-no-files-found: error - retention-days: 30 - - - name: Upload binaries to release - uses: svenstaro/upload-release-action@v2 - with: - repo_token: ${{ secrets.GITHUB_TOKEN }} - file: ${{ steps.dist-arch.outputs.name }} - asset_name: ${{ steps.dist-arch.outputs.name }} - tag: ${{ github.ref }} - - docker: - name: Build docker image - runs-on: ubuntu-20.04 - steps: - - name: Check out code - uses: actions/checkout@v2 - - - name: Make docker login - run: echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_LOGIN }}" --password-stdin - - - name: Generate builder values - id: values - run: | - echo "::set-output name=version::`echo ${GITHUB_REF##*/} | sed -e 's/^[vV ]*//'`" - echo "::set-output name=timestamp::`date +%FT%T%z`" - - - name: Build image - run: | - docker build \ - --tag "spiralscout/roadrunner:${{ steps.values.outputs.version }}" \ - --build-arg "APP_VERSION=${{ steps.values.outputs.version }}" \ - --build-arg "BUILD_TIME=${{ steps.values.outputs.timestamp }}" \ - . - - - name: Push image into registry - run: docker push "spiralscout/roadrunner:${{ steps.values.outputs.version }}" @@ -32,9 +32,6 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pool.out -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal.out -covermode=atomic ./tests/plugins/temporal - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal_protocol.out -covermode=atomic ./plugins/temporal/protocol - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal_workflow.out -covermode=atomic ./plugins/temporal/workflow go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload @@ -66,9 +63,6 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/pool go test -v -race -tags=debug ./pkg/worker go test -v -race -tags=debug ./pkg/worker_watcher - go test -v -race -tags=debug ./tests/plugins/temporal - go test -v -race -tags=debug ./plugins/temporal/protocol - go test -v -race -tags=debug ./plugins/temporal/workflow go test -v -race -tags=debug ./tests/plugins/http go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./tests/plugins/informer diff --git a/cmd/cli/reset.go b/cmd/cli/reset.go deleted file mode 100644 index a5055a53..00000000 --- a/cmd/cli/reset.go +++ /dev/null @@ -1,107 +0,0 @@ -package cli - -import ( - "fmt" - "sync" - - "github.com/fatih/color" - "github.com/mattn/go-runewidth" - "github.com/spf13/cobra" - "github.com/spiral/errors" - "github.com/vbauerster/mpb/v5" - "github.com/vbauerster/mpb/v5/decor" -) - -// List is the resetter.List RPC method -const List string = "resetter.List" - -// Reset is the resetter.Reset RPC method -const Reset string = "resetter.Reset" - -func init() { - root.AddCommand(&cobra.Command{ - Use: "reset", - Short: "Reset workers of all or specific RoadRunner service", - RunE: resetHandler, - }) -} - -func resetHandler(_ *cobra.Command, args []string) error { - const op = errors.Op("reset_handler") - client, err := RPCClient() - if err != nil { - return err - } - defer func() { - _ = client.Close() - }() - - var services []string - if len(args) != 0 { - services = args - } else { - err = client.Call(List, true, &services) - if err != nil { - return errors.E(op, err) - } - } - - var wg sync.WaitGroup - pr := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithWidth(6)) - wg.Add(len(services)) - - for _, service := range services { - var ( - bar *mpb.Bar - name = runewidth.FillRight(fmt.Sprintf("Resetting plugin: [%s]", color.HiYellowString(service)), 27) - result = make(chan interface{}) - ) - - bar = pr.AddSpinner( - 1, - mpb.SpinnerOnMiddle, - mpb.SpinnerStyle([]string{"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"}), - mpb.PrependDecorators(decor.Name(name)), - mpb.AppendDecorators(onComplete(result)), - ) - - // simulating some work - go func(service string, result chan interface{}) { - defer wg.Done() - defer bar.Increment() - - var done bool - err = client.Call(Reset, service, &done) - if err != nil { - result <- errors.E(op, err) - return - } - result <- nil - }(service, result) - } - - pr.Wait() - return nil -} - -func onComplete(result chan interface{}) decor.Decorator { - var ( - msg = "" - fn = func(s decor.Statistics) string { - select { - case r := <-result: - if err, ok := r.(error); ok { - msg = color.HiRedString(err.Error()) - return msg - } - - msg = color.HiGreenString("done") - return msg - default: - return msg - } - } - ) - - return decor.Any(fn) -} diff --git a/cmd/cli/root.go b/cmd/cli/root.go deleted file mode 100644 index 6f73aecf..00000000 --- a/cmd/cli/root.go +++ /dev/null @@ -1,129 +0,0 @@ -package cli - -import ( - "log" - "net/http/pprof" - "net/rpc" - "os" - "path/filepath" - - "github.com/spiral/errors" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - - "github.com/spiral/roadrunner/v2/plugins/config" - - "net/http" - - "github.com/spf13/cobra" - endure "github.com/spiral/endure/pkg/container" -) - -var ( - // WorkDir is working directory - WorkDir string - // CfgFile is path to the .rr.yaml - CfgFile string - // Debug mode - Debug bool - // Container is the pointer to the Endure container - Container *endure.Endure - cfg *config.Viper - root = &cobra.Command{ - Use: "rr", - SilenceErrors: true, - SilenceUsage: true, - Version: Version, - } -) - -func Execute() { - if err := root.Execute(); err != nil { - // exit with error, fatal invoke os.Exit(1) - log.Fatal(err) - } -} - -func init() { - root.PersistentFlags().StringVarP(&CfgFile, "config", "c", ".rr.yaml", "config file (default is .rr.yaml)") - root.PersistentFlags().StringVarP(&WorkDir, "WorkDir", "w", "", "work directory") - root.PersistentFlags().BoolVarP(&Debug, "debug", "d", false, "debug mode") - cobra.OnInitialize(func() { - if CfgFile != "" { - if absPath, err := filepath.Abs(CfgFile); err == nil { - CfgFile = absPath - - // force working absPath related to config file - if err := os.Chdir(filepath.Dir(absPath)); err != nil { - panic(err) - } - } - } - - if WorkDir != "" { - if err := os.Chdir(WorkDir); err != nil { - panic(err) - } - } - - cfg = &config.Viper{} - cfg.Path = CfgFile - cfg.Prefix = "rr" - - // register config - err := Container.Register(cfg) - if err != nil { - panic(err) - } - - // if debug mode is on - run debug server - if Debug { - runDebugServer() - } - }) -} - -// RPCClient is using to make a requests to the ./rr reset, ./rr workers -func RPCClient() (*rpc.Client, error) { - rpcConfig := &rpcPlugin.Config{} - - err := cfg.Init() - if err != nil { - return nil, err - } - - if !cfg.Has(rpcPlugin.PluginName) { - return nil, errors.E("rpc service disabled") - } - - err = cfg.UnmarshalKey(rpcPlugin.PluginName, rpcConfig) - if err != nil { - return nil, err - } - rpcConfig.InitDefaults() - - conn, err := rpcConfig.Dialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil -} - -// debug server -func runDebugServer() { - mux := http.NewServeMux() - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - srv := http.Server{ - Addr: ":6061", - Handler: mux, - } - - if err := srv.ListenAndServe(); err != nil { - log.Fatal(err) - } -} diff --git a/cmd/cli/serve.go b/cmd/cli/serve.go deleted file mode 100644 index 993ec477..00000000 --- a/cmd/cli/serve.go +++ /dev/null @@ -1,63 +0,0 @@ -package cli - -import ( - "log" - "os" - "os/signal" - "syscall" - - "github.com/spf13/cobra" - "github.com/spiral/errors" - "go.uber.org/multierr" -) - -func init() { - root.AddCommand(&cobra.Command{ - Use: "serve", - Short: "Start RoadRunner server", - RunE: handler, - }) -} - -func handler(_ *cobra.Command, _ []string) error { - const op = errors.Op("handle_serve_command") - /* - We need to have path to the config at the RegisterTarget stage - But after cobra.Execute, because cobra fills up cli variables on this stage - */ - - err := Container.Init() - if err != nil { - return errors.E(op, err) - } - - errCh, err := Container.Serve() - if err != nil { - return errors.E(op, err) - } - - // https://golang.org/pkg/os/signal/#Notify - // should be of buffer size at least 1 - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) - - for { - select { - case e := <-errCh: - err = multierr.Append(err, e.Error) - log.Printf("error occurred: %v, service: %s", e.Error.Error(), e.VertexID) - er := Container.Stop() - if er != nil { - err = multierr.Append(err, er) - return errors.E(op, err) - } - return errors.E(op, err) - case <-c: - err = Container.Stop() - if err != nil { - return errors.E(op, err) - } - return nil - } - } -} diff --git a/cmd/cli/version.go b/cmd/cli/version.go deleted file mode 100644 index 89728bd2..00000000 --- a/cmd/cli/version.go +++ /dev/null @@ -1,9 +0,0 @@ -package cli - -var ( - // Version - defines build version. - Version string = "local" - - // BuildTime - defined build time. - BuildTime string = "development" -) diff --git a/cmd/cli/workers.go b/cmd/cli/workers.go deleted file mode 100644 index 09642a58..00000000 --- a/cmd/cli/workers.go +++ /dev/null @@ -1,109 +0,0 @@ -package cli - -import ( - "fmt" - "log" - "net/rpc" - "os" - "os/signal" - "syscall" - "time" - - tm "github.com/buger/goterm" - "github.com/fatih/color" - "github.com/spf13/cobra" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/tools" -) - -// use interactive mode -var interactive bool - -const InformerList string = "informer.List" -const InformerWorkers string = "informer.Workers" - -func init() { - workersCommand := &cobra.Command{ - Use: "workers", - Short: "Show information about active roadrunner workers", - RunE: workersHandler, - } - - workersCommand.Flags().BoolVarP( - &interactive, - "interactive", - "i", - false, - "render interactive workers table", - ) - - root.AddCommand(workersCommand) -} - -func workersHandler(_ *cobra.Command, args []string) error { - const op = errors.Op("handle_workers_command") - // get RPC client - client, err := RPCClient() - if err != nil { - return err - } - defer func() { - err := client.Close() - if err != nil { - log.Printf("error when closing RPCClient: error %v", err) - } - }() - - var plugins []string - // assume user wants to show workers from particular plugin - if len(args) != 0 { - plugins = args - } else { - err = client.Call(InformerList, true, &plugins) - if err != nil { - return errors.E(op, err) - } - } - - if !interactive { - return showWorkers(plugins, client) - } - - // https://golang.org/pkg/os/signal/#Notify - // should be of buffer size at least 1 - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) - - tm.Clear() - tt := time.NewTicker(time.Second) - defer tt.Stop() - for { - select { - case <-c: - return nil - case <-tt.C: - tm.MoveCursor(1, 1) - err := showWorkers(plugins, client) - if err != nil { - return errors.E(op, err) - } - tm.Flush() - } - } -} - -func showWorkers(plugins []string, client *rpc.Client) error { - const op = errors.Op("show_workers") - for _, plugin := range plugins { - list := &informer.WorkerList{} - err := client.Call(InformerWorkers, plugin, &list) - if err != nil { - return errors.E(op, err) - } - - fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin)) - tools.WorkerTable(os.Stdout, list.Workers).Render() - } - return nil -} diff --git a/cmd/main.go b/cmd/main.go deleted file mode 100644 index 8074f316..00000000 --- a/cmd/main.go +++ /dev/null @@ -1,69 +0,0 @@ -package main - -import ( - "log" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/cmd/cli" - httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/temporal/activity" - temporalClient "github.com/spiral/roadrunner/v2/plugins/temporal/client" - "github.com/spiral/roadrunner/v2/plugins/temporal/workflow" - - "github.com/spiral/roadrunner/v2/plugins/kv/boltdb" - "github.com/spiral/roadrunner/v2/plugins/kv/memcached" - "github.com/spiral/roadrunner/v2/plugins/kv/memory" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/metrics" - "github.com/spiral/roadrunner/v2/plugins/redis" - "github.com/spiral/roadrunner/v2/plugins/reload" - "github.com/spiral/roadrunner/v2/plugins/resetter" - "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" -) - -func main() { - var err error - cli.Container, err = endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.RetryOnFail(false)) - if err != nil { - log.Fatal(err) - } - - err = cli.Container.RegisterAll( - // logger plugin - &logger.ZapLogger{}, - // metrics plugin - &metrics.Plugin{}, - // redis plugin (internal) - &redis.Plugin{}, - // http server plugin - &httpPlugin.Plugin{}, - // reload plugin - &reload.Plugin{}, - // informer plugin (./rr workers, ./rr workers -i) - &informer.Plugin{}, - // resetter plugin (./rr reset) - &resetter.Plugin{}, - // rpc plugin (workers, reset) - &rpc.Plugin{}, - // server plugin (NewWorker, NewWorkerPool) - &server.Plugin{}, - // memcached kv plugin - &memcached.Plugin{}, - // in-memory kv plugin - &memory.Plugin{}, - // boltdb driver - &boltdb.Plugin{}, - - // temporal plugins - &temporalClient.Plugin{}, - &activity.Plugin{}, - &workflow.Plugin{}, - ) - if err != nil { - log.Fatal(err) - } - - cli.Execute() -} @@ -7,38 +7,28 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/alicebob/miniredis/v2 v2.14.1 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b - github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 - github.com/cenkalti/backoff/v4 v4.1.0 github.com/dustin/go-humanize v1.0.0 github.com/fatih/color v1.10.0 github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-redis/redis/v8 v8.4.10 github.com/gofiber/fiber/v2 v2.3.3 github.com/golang/mock v1.4.4 - github.com/golang/protobuf v1.4.3 github.com/hashicorp/go-multierror v1.1.0 github.com/json-iterator/go v1.1.10 - github.com/mattn/go-runewidth v0.0.10 github.com/olekukonko/tablewriter v0.0.4 - github.com/pborman/uuid v1.2.1 github.com/prometheus/client_golang v1.9.0 github.com/shirou/gopsutil v3.20.12+incompatible - github.com/spf13/cobra v1.1.1 github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta21 github.com/spiral/errors v1.0.9 github.com/spiral/goridge/v3 v3.0.0 github.com/stretchr/testify v1.7.0 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a - github.com/vbauerster/mpb/v5 v5.4.0 github.com/yookoala/gofast v0.4.0 go.etcd.io/bbolt v1.3.5 - go.temporal.io/api v1.4.0 - go.temporal.io/sdk v1.4.0 go.uber.org/multierr v1.6.0 go.uber.org/zap v1.16.0 golang.org/x/net v0.0.0-20201224014010-6772e930b67b golang.org/x/sync v0.0.0-20201207232520-09787c993a3a golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 - google.golang.org/protobuf v1.25.0 ) @@ -76,6 +76,7 @@ github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= +github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -129,16 +130,13 @@ github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgO github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= -github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY= github.com/go-redis/redis/v8 v8.4.10 h1:fWdl0RBmVibUDOp8bqz1e2Yy9dShOeIeWsiAifYk06Y= github.com/go-redis/redis/v8 v8.4.10/go.mod h1:d5yY/TlkQyYBSBHnXUmnf1OrHbyQere5JV4dLKwvXmo= github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gofiber/fiber/v2 v2.3.0/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0= github.com/gofiber/fiber/v2 v2.3.3 h1:nsjc9TfCl+ojXgEAu+uAT1Le7iQtZJ+Gfb/ox6+BM4w= github.com/gofiber/fiber/v2 v2.3.3/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= @@ -272,6 +270,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= @@ -288,6 +287,7 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= @@ -320,6 +320,7 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -454,12 +455,8 @@ github.com/spf13/viper v1.7.0 h1:xVKxvI7ouOI5I+U9s2eeiUfMaWBVoXA3AWskkrqK0VM= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spiral/endure v1.0.0-beta20/go.mod h1:qCU2/4gAItVESzUK0yPExmUTlTcpRLqJUgcV+nqxn+o= github.com/spiral/endure v1.0.0-beta21 h1:YW3gD6iNhRByG/yFkm/Ko+nj+oTBsjBtPVHFA2nt67k= github.com/spiral/endure v1.0.0-beta21/go.mod h1:GsItn+dYSO4O5uwvfki23xyxRnmBhxEyL6jBeJQoFPw= -github.com/spiral/endure v1.0.0-beta9 h1:PNiVit9DCucmhZLd4RgoiVL7Y5DBmweUwadFyulAct8= -github.com/spiral/endure v1.0.0-beta9/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ= -github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.7/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= @@ -488,7 +485,9 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/uber-go/tally v3.3.17+incompatible h1:nFHIuW3VQ22wItiE9kPXic8dEgExWOsVOHwpmoIvsMw= github.com/uber-go/tally v3.3.17+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU= +github.com/uber/jaeger-client-go v2.23.1+incompatible h1:uArBYHQR0HqLFFAypI7RsWTzPSj/bDpmZZuQjMLSg1A= github.com/uber/jaeger-client-go v2.23.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= @@ -506,7 +505,6 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q github.com/yookoala/gofast v0.4.0 h1:dLBjghcsbbZNOEHN8N1X/gh9S6srmJed4WQfG7DlKwo= github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0= github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= @@ -520,7 +518,6 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opentelemetry.io/otel v0.16.0 h1:uIWEbdeb4vpKPGITLsRVUS44L5oDbDUCZxn8lkxhmgw= go.opentelemetry.io/otel v0.16.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= @@ -571,15 +568,15 @@ golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= -golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.2.0 h1:KU7oHjnv3XNWfa5COkzUifxZmxp1TyI7ImMXqFxLwvQ= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -603,7 +600,6 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201216054612-986b41b23924 h1:QsnDpLLOKwHBBDa8nDws4DYNc/ryVW2vCpxCs09d4PY= golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -619,7 +615,6 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -712,12 +707,10 @@ golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200605181038-cef9fc3bc8f0 h1:gxU2P+MOOGAWge5BKP+BzqSeegxvDBRib5rk3yZDDuI= golang.org/x/tools v0.0.0-20200605181038-cef9fc3bc8f0/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= -golang.org/x/tools v0.0.0-20210115202250-e0d201561e39/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= @@ -780,6 +773,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go deleted file mode 100644 index d09722ce..00000000 --- a/plugins/temporal/activity/activity_pool.go +++ /dev/null @@ -1,197 +0,0 @@ -package activity - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/pool" - rrWorker "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/client" - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/activity" - "go.temporal.io/sdk/converter" - "go.temporal.io/sdk/internalbindings" - "go.temporal.io/sdk/worker" -) - -// RR_MODE env variable -const RR_MODE = "RR_MODE" //nolint:golint,stylecheck -// RR_CODEC env variable -const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck - -// -const doNotCompleteOnReturn = "doNotCompleteOnReturn" - -type activityPool interface { - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.SyncWorker - ActivityNames() []string - GetActivityContext(taskToken []byte) (context.Context, error) -} - -type activityPoolImpl struct { - dc converter.DataConverter - codec rrt.Codec - seqID uint64 - activities []string - wp pool.Pool - tWorkers []worker.Worker - running sync.Map -} - -// newActivityPool -func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) { - const op = errors.Op("new_activity_pool") - // env variables - env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()} - wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener) - if err != nil { - return nil, errors.E(op, err) - } - - return &activityPoolImpl{ - codec: codec, - wp: wp, - running: sync.Map{}, - }, nil -} - -// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. -func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("activity_pool_start") - pool.dc = temporal.GetDataConverter() - - err := pool.initWorkers(ctx, temporal) - if err != nil { - return errors.E(op, err) - } - - for i := 0; i < len(pool.tWorkers); i++ { - err := pool.tWorkers[i].Start() - if err != nil { - return errors.E(op, err) - } - } - - return nil -} - -// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. -func (pool *activityPoolImpl) Destroy(ctx context.Context) error { - for i := 0; i < len(pool.tWorkers); i++ { - pool.tWorkers[i].Stop() - } - - pool.wp.Destroy(ctx) - return nil -} - -// Workers returns list of all allocated workers. -func (pool *activityPoolImpl) Workers() []rrWorker.SyncWorker { - return pool.wp.Workers() -} - -// ActivityNames returns list of all available activity names. -func (pool *activityPoolImpl) ActivityNames() []string { - return pool.activities -} - -// ActivityNames returns list of all available activity names. -func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) { - const op = errors.Op("activity_pool_get_activity_context") - c, ok := pool.running.Load(string(taskToken)) - if !ok { - return nil, errors.E(op, errors.Str("heartbeat on non running activity")) - } - - return c.(context.Context), nil -} - -// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. -func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("activity_pool_create_temporal_worker") - - workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter()) - if err != nil { - return errors.E(op, err) - } - - pool.activities = make([]string, 0) - pool.tWorkers = make([]worker.Worker, 0) - - for i := 0; i < len(workerInfo); i++ { - w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options) - if err != nil { - return errors.E(op, err, pool.Destroy(ctx)) - } - - pool.tWorkers = append(pool.tWorkers, w) - for j := 0; j < len(workerInfo[i].Activities); j++ { - w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{ - Name: workerInfo[i].Activities[j].Name, - DisableAlreadyRegisteredCheck: false, - }) - - pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name) - } - } - - return nil -} - -// executes activity with underlying worker. -func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) { - const op = errors.Op("activity_pool_execute_activity") - - heartbeatDetails := &common.Payloads{} - if activity.HasHeartbeatDetails(ctx) { - err := activity.GetHeartbeatDetails(ctx, &heartbeatDetails) - if err != nil { - return nil, errors.E(op, err) - } - } - - var info = activity.GetInfo(ctx) - var msg = rrt.Message{ - ID: atomic.AddUint64(&pool.seqID, 1), - Command: rrt.InvokeActivity{ - Name: info.ActivityType.Name, - Info: info, - HeartbeatDetails: len(heartbeatDetails.Payloads), - }, - Payloads: args, - } - - if len(heartbeatDetails.Payloads) != 0 { - msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...) - } - - pool.running.Store(string(info.TaskToken), ctx) - defer pool.running.Delete(string(info.TaskToken)) - - result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg) - if err != nil { - return nil, errors.E(op, err) - } - - if len(result) != 1 { - return nil, errors.E(op, errors.Str("invalid activity worker response")) - } - - out := result[0] - if out.Failure != nil { - if out.Failure.Message == doNotCompleteOnReturn { - return nil, activity.ErrResultPending - } - - return nil, internalbindings.ConvertFailureToError(out.Failure, pool.dc) - } - - return out.Payloads, nil -} diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go deleted file mode 100644 index 5e562a8d..00000000 --- a/plugins/temporal/activity/plugin.go +++ /dev/null @@ -1,215 +0,0 @@ -package activity - -import ( - "context" - "time" - - "github.com/cenkalti/backoff/v4" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" - - "sync" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/client" -) - -const ( - // PluginName defines public service name. - PluginName = "activities" - - // RRMode sets as RR_MODE env variable to let worker know about the mode to run. - RRMode = "temporal/activity" -) - -// Plugin to manage activity execution. -type Plugin struct { - temporal client.Temporal - events events.Handler - server server.Server - log logger.Logger - mu sync.Mutex - reset chan struct{} - pool activityPool - closing int64 -} - -// Init configures activity service. -func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { - const op = errors.Op("activity_plugin_init") - if temporal.GetConfig().Activities == nil { - // no need to serve activities - return errors.E(op, errors.Disabled) - } - - p.temporal = temporal - p.server = server - p.events = events.NewEventsHandler() - p.log = log - p.reset = make(chan struct{}) - - return nil -} - -// Serve activities with underlying workers. -func (p *Plugin) Serve() chan error { - const op = errors.Op("activity_plugin_serve") - - errCh := make(chan error, 1) - pool, err := p.startPool() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - p.pool = pool - - go func() { - for { - select { - case <-p.reset: - if atomic.LoadInt64(&p.closing) == 1 { - return - } - - err := p.replacePool() - if err == nil { - continue - } - - bkoff := backoff.NewExponentialBackOff() - bkoff.InitialInterval = time.Second - - err = backoff.Retry(p.replacePool, bkoff) - if err != nil { - errCh <- errors.E(op, err) - } - } - } - }() - - return errCh -} - -// Stop stops the serving plugin. -func (p *Plugin) Stop() error { - atomic.StoreInt64(&p.closing, 1) - const op = errors.Op("activity_plugin_stop") - - pool := p.getPool() - if pool != nil { - p.pool = nil - err := pool.Destroy(context.Background()) - if err != nil { - return errors.E(op, err) - } - return nil - } - - return nil -} - -// Name of the service. -func (p *Plugin) Name() string { - return PluginName -} - -// RPC returns associated rpc service. -func (p *Plugin) RPC() interface{} { - return &rpc{srv: p, client: p.temporal.GetClient()} -} - -// Workers returns pool workers. -func (p *Plugin) Workers() []worker.SyncWorker { - return p.getPool().Workers() -} - -// ActivityNames returns list of all available activities. -func (p *Plugin) ActivityNames() []string { - return p.pool.ActivityNames() -} - -// Reset resets underlying workflow pool with new copy. -func (p *Plugin) Reset() error { - p.reset <- struct{}{} - - return nil -} - -// AddListener adds event listeners to the service. -func (p *Plugin) AddListener(listener events.Listener) { - p.events.AddListener(listener) -} - -// AddListener adds event listeners to the service. -func (p *Plugin) poolListener(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventPoolError { - p.log.Error("Activity pool error", "error", ev.Payload.(error)) - p.reset <- struct{}{} - } - } - - p.events.Push(event) -} - -// AddListener adds event listeners to the service. -func (p *Plugin) startPool() (activityPool, error) { - pool, err := newActivityPool( - p.temporal.GetCodec().WithLogger(p.log), - p.poolListener, - *p.temporal.GetConfig().Activities, - p.server, - ) - - if err != nil { - return nil, errors.E(errors.Op("newActivityPool"), err) - } - - err = pool.Start(context.Background(), p.temporal) - if err != nil { - return nil, errors.E(errors.Op("startActivityPool"), err) - } - - p.log.Debug("Started activity processing", "activities", pool.ActivityNames()) - - return pool, nil -} - -func (p *Plugin) replacePool() error { - pool, err := p.startPool() - if err != nil { - p.log.Error("Replace activity pool failed", "error", err) - return errors.E(errors.Op("newActivityPool"), err) - } - - p.log.Debug("Replace activity pool") - - var previous activityPool - - p.mu.Lock() - previous, p.pool = p.pool, pool - p.mu.Unlock() - - errD := previous.Destroy(context.Background()) - if errD != nil { - p.log.Error( - "Unable to destroy expired activity pool", - "error", - errors.E(errors.Op("destroyActivityPool"), errD), - ) - } - - return nil -} - -// getPool returns currently pool. -func (p *Plugin) getPool() activityPool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.pool -} diff --git a/plugins/temporal/activity/rpc.go b/plugins/temporal/activity/rpc.go deleted file mode 100644 index 49efcd4f..00000000 --- a/plugins/temporal/activity/rpc.go +++ /dev/null @@ -1,66 +0,0 @@ -package activity - -import ( - v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/activity" - "go.temporal.io/sdk/client" - "google.golang.org/protobuf/proto" -) - -/* -- the method's type is exported. -- the method is exported. -- the method has two arguments, both exported (or builtin) types. -- the method's second argument is a pointer. -- the method has return type error. -*/ -type rpc struct { - srv *Plugin - client client.Client -} - -// RecordHeartbeatRequest sent by activity to record current state. -type RecordHeartbeatRequest struct { - TaskToken []byte `json:"taskToken"` - Details []byte `json:"details"` -} - -// RecordHeartbeatResponse sent back to the worker to indicate that activity was cancelled. -type RecordHeartbeatResponse struct { - Canceled bool `json:"canceled"` -} - -// RecordActivityHeartbeat records heartbeat for an activity. -// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity. -// details - is the progress you want to record along with heart beat for this activity. -// The errors it can return: -// - EntityNotExistsError -// - InternalServiceError -// - CanceledError -func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHeartbeatResponse) error { - details := &commonpb.Payloads{} - - if len(in.Details) != 0 { - if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil { - return err - } - } - - // find running activity - ctx, err := r.srv.getPool().GetActivityContext(in.TaskToken) - if err != nil { - return err - } - - activity.RecordHeartbeat(ctx, details) - - select { - case <-ctx.Done(): - *out = RecordHeartbeatResponse{Canceled: true} - default: - *out = RecordHeartbeatResponse{Canceled: false} - } - - return nil -} diff --git a/plugins/temporal/client/doc/doc.go b/plugins/temporal/client/doc/doc.go deleted file mode 100644 index 10257070..00000000 --- a/plugins/temporal/client/doc/doc.go +++ /dev/null @@ -1 +0,0 @@ -package doc diff --git a/plugins/temporal/client/doc/temporal.drawio b/plugins/temporal/client/doc/temporal.drawio deleted file mode 100644 index f2350af8..00000000 --- a/plugins/temporal/client/doc/temporal.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2020-10-20T11:17:09.390Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="U-Sm3BmbD4KaqDkjKEOx" version="13.7.9" type="device"><diagram name="Page-1" id="6133507b-19e7-1e82-6fc7-422aa6c4b21f">7V1tc6M2EP41nsl1Jh5eDIaPsZPcdXptc83N5dpvspFtNRhRkGP7fn0lkDAgSEgCxrkoH2K0SEJon919VrwNzOl69zEC4ep37EF/YGjebmBeDgxD17UR/WGSPZdohpVKlhHyuOwguEU/oKjIpRvkwbhQkWDsExQWhXMcBHBOCjIQRXhbrLbAfvGoIVhCSXA7B74svUMeWaVSwzTtw45PEC1X4tC2xU95Bub3ywhvAn7AgWEukr909xqIzviZxivg4W1OZF4NzGmEMUm31rsp9NnsinlL213X7M0GHsGANGkwc0ezuW46c+A6GlycO3xYZC/mAnp0angRR2SFlzgA/tVBOklOF7IONVpakbVPN3W66YMZ9CfZjEyxjyO6K8ABaxYTEJELpq2S7Br5Pu8sKXN8OLQMA6/Ugkpy9WmJ19ZtWlzggFyDNfIZEL/ByAMB4GJRjc74RJ40Po8x3kRzPg9jx7V13XIXmg1dBzrnI5MjE0RLSGoq6eO0EpvFXNdcGR8hXkMS7WmFCPqAoIciAAHH8TKrx5vSSQD7XIUQo4DEuZ5vmIBWyGxyzAHHTdLR3CIuntmAbqRjEKXcyRxECdiaAU/nHuIB+Bs+CdpwqEtwJHBHiiiLYIx+gFlSgWGAzwWtbU0G1iWVAB8tAyqYU/1CisDJA4wIouZ+wXeskeclUK5B7MGKZfBUYkwCVI2dsYHAXUHn3KfxEyq4hQJgeKtzbWiNR1ZBV+cO91PPA5UEAqcIgXGpA7xYxBT4RVS8Gge2hAMFgqdAoA0dV3gagQG9FQhkdpl16x4BBO6biUIeAmsceF9XKOg1GDlGg2AkIlZ7waig8+dYuStb+XD4y89q6OJs2/D2zsjuxNKFSvaV7bvx9Zqk8FO1cxzCN2DgrnUqBi7ymyKd+2kNXCC5FQunarXaMWm95Ci0Ixi1GOZ7IHDtqV0b2q5b6YFfiQGjBAHjGH5dQsDAsH2m8LAAA/u/DVvnmKypT2P86YLu1cId/Z+4SS2VnxMcpvtGuX0MQeccEmwfR0XWJ91a8t/kyCgnAOuQoUQu/cr6WADqhIW42KTQI50VVD7KLJIkQvAVrkMcAT/XfFauTGVhWfaiKfPhguTmrG5S5OFWjWAVsSGIRTrRl17fbTdDZnU/QjL1EVX12QdaOpsn28NUNDCmLPTRgB19qDpB0R4HC7RM2qebNXW5yoNZzH6mEQQE3uHoHkZnBMT3XzaQgpudZ4SCZXpsHBKEg5hubZOKwz9TQTJWLkq7qBprcepLDrPs6xKbyFxiOncTTGst/ISvLBKW8ribm1K+gdhYtD/gtuSNCyQrW6RkhTleoznffopyRfg+W0DV+eFz3pj9XV8/Rokkx1rrQEduabGCF7eH9VtLrGes8ku35aD4EvZjmJZl2WPHAI4JbU+r8IBTH8RxANZQUm28RWsfJDOW00CiOV5JbxL9UkTMV8j3PoM93rDzo4R2fi9KkxWO0A/aLRDHyPNdwy7UuGUtuZaT0AxvhHayaA0/g5hkqPB9EMZolg04NesJJgSvBY74mTIKXRWUu0Jd7nC8UiOq/npQ2qMiKHVNXCfIwVK3q2BpOx3A0qgIzMwtLhBkV0iY8yX7UIaoTNfSWS7PqjzRJQ9VDdo4BHPqRD8ndS5HB8lffEKYaLtCBN6GIMmQthEIiw5vRclfkqJFmABSSyQndK6n2tBilJL6YIuFMVFOWCaN0SxE0PMDKNE7pCDfwvjglfOIqDX8pyHCIWHYzQAh6rWKB1Ph4dTwYBk94kG+BqPw0DMexk6PeJCvxUiq91GZuTQKwE/oPUvluaK/MhxcnusSGEwZDGaFkhPKcINjxPg4lUVp3YpVhmPrN7tZ4ikFd8EHxjX2TnteYe+M2foHZfa9wEI/YhyovWtB5S8qfykk1ZacVVenL3oL7kqCpaH4asuOqt7yTzKBkRExUog4OUQcM4WRhyvfWaA4a9sabpqUdBIEFGk9Wct3+owFI5kdKNL6DkmrbozLrLXxovuoC1wqjtK1p8pM/22w1pFaZz09RPTKWq2qW2YUa21Xw32yVoEuxVpPz/KPyVqluyUVLE4VFkddgi/jom6lU+Gif1yYR7wkK43Xlte3VJb7DrNcxy4muc1vLXONDmApUm6V0nTmqTLLfxtJ7rhqnValNO1quGnS2kVKI55IU5HofUciab2VhiKzGSz1URexyFELrl3Hosz230YscqouCitE9IuIXhdc3Sq+qthJuxruc8HVrQsCagWld8vvc8FVweJkYdHvPc+ufFFWpTPvMZ2x9JenMy28SUUGZtVTOIq8tumrMtt/G+mMrslLLwoSfUOi39uetaorhiqhaVfFfSY02YOCirqenu33mdHU3UyoYNE7LHp+ilNzJGiolOY9pjSa+9KbBfTy+35bQqZakO/86R2t7vXSJ5rU6OoWkhPERL9ZTcXb4FVW07qOe01rKt41ofjrqVj/MfOai5vb8fe7y4urL1eju+Cf6NO33+yKFylezAl6QGR/C6MH9vLKehr7FrhrKvod7AoVn8toH32tZxsE0ijxR4GBHC6yu4AK9NFswWdUAqPBCujhjbsaOrzYt9p9HN9nHNk/PAqRoqVIeHnUMl/DLisx05kvafDUZfp23mswJzgZusJN/7ipYKDHxU2D628iKN1grNzNacCmgtQeFzbNL8j8vPnKM4lJO5qveqVipeq74ibtXKh53z6iM3BULMkf1y+8/hObhzqfcWLmzMr/hYQmRWkmAjYEF3NiuEPke277b9YV1VNauhTpR1LYi0JAT/d7vpC2MixRPrRLSqLhy9KR/OePHqP8+S8g1Xx5pPPvH9Hi4Rut6RdTDp/CNa/+Bw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go deleted file mode 100644 index 047a1815..00000000 --- a/plugins/temporal/client/plugin.go +++ /dev/null @@ -1,169 +0,0 @@ -package client - -import ( - "fmt" - "os" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - "go.temporal.io/sdk/client" - "go.temporal.io/sdk/converter" - "go.temporal.io/sdk/worker" -) - -// PluginName defines public service name. -const PluginName = "temporal" - -// indicates that the case size was set -var stickyCacheSet = false - -// Plugin implement Temporal contract. -type Plugin struct { - workerID int32 - cfg *Config - dc converter.DataConverter - log logger.Logger - client client.Client -} - -// Temporal define common interface for RoadRunner plugins. -type Temporal interface { - GetClient() client.Client - GetDataConverter() converter.DataConverter - GetConfig() Config - GetCodec() rrt.Codec - CreateWorker(taskQueue string, options worker.Options) (worker.Worker, error) -} - -// Config of the temporal client and depended services. -type Config struct { - Address string - Namespace string - Activities *pool.Config - Codec string - DebugLevel int `mapstructure:"debug_level"` - CacheSize int `mapstructure:"cache_size"` -} - -// Init initiates temporal client plugin. -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("temporal_client_plugin_init") - p.log = log - p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter()) - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, err) - } - if p.cfg == nil { - return errors.E(op, errors.Disabled) - } - - return nil -} - -// GetConfig returns temporal configuration. -func (p *Plugin) GetConfig() Config { - if p.cfg != nil { - return *p.cfg - } - // empty - return Config{} -} - -// GetCodec returns communication codec. -func (p *Plugin) GetCodec() rrt.Codec { - if p.cfg.Codec == "json" { - return rrt.NewJSONCodec(rrt.DebugLevel(p.cfg.DebugLevel), p.log) - } - - // production ready protocol, no debug abilities - return rrt.NewProtoCodec() -} - -// GetDataConverter returns data active data converter. -func (p *Plugin) GetDataConverter() converter.DataConverter { - return p.dc -} - -// Serve starts temporal srv. -func (p *Plugin) Serve() chan error { - const op = errors.Op("temporal_client_plugin_serve") - errCh := make(chan error, 1) - var err error - - if stickyCacheSet == false && p.cfg.CacheSize != 0 { - worker.SetStickyWorkflowCacheSize(p.cfg.CacheSize) - stickyCacheSet = true - } - - p.client, err = client.NewClient(client.Options{ - Logger: p.log, - HostPort: p.cfg.Address, - Namespace: p.cfg.Namespace, - DataConverter: p.dc, - }) - - if err != nil { - errCh <- errors.E(op, err) - } - - p.log.Debug("connected to temporal server", "address", p.cfg.Address) - - return errCh -} - -// Stop stops temporal srv connection. -func (p *Plugin) Stop() error { - if p.client != nil { - p.client.Close() - } - - return nil -} - -// GetClient returns active srv connection. -func (p *Plugin) GetClient() client.Client { - return p.client -} - -// CreateWorker allocates new temporal worker on an active connection. -func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) { - const op = errors.Op("temporal_client_plugin_create_worker") - if p.client == nil { - return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client")) - } - - if options.Identity == "" { - if tq == "" { - tq = client.DefaultNamespace - } - - // ensures unique worker IDs - options.Identity = fmt.Sprintf( - "%d@%s@%s@%v", - os.Getpid(), - getHostName(), - tq, - atomic.AddInt32(&p.workerID, 1), - ) - } - - return worker.New(p.client, tq, options), nil -} - -// Name of the service. -func (p *Plugin) Name() string { - return PluginName -} - -func getHostName() string { - hostName, err := os.Hostname() - if err != nil { - hostName = "Unknown" - } - return hostName -} diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go deleted file mode 100644 index 406e70f4..00000000 --- a/plugins/temporal/protocol/converter.go +++ /dev/null @@ -1,76 +0,0 @@ -package protocol - -import ( - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/converter" -) - -type ( - // DataConverter wraps Temporal data converter to enable direct access to the payloads. - DataConverter struct { - fallback converter.DataConverter - } -) - -// NewDataConverter creates new data converter. -func NewDataConverter(fallback converter.DataConverter) converter.DataConverter { - return &DataConverter{fallback: fallback} -} - -// ToPayloads converts a list of values. -func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) { - for _, v := range values { - if aggregated, ok := v.(*commonpb.Payloads); ok { - // bypassing - return aggregated, nil - } - } - - return r.fallback.ToPayloads(values...) -} - -// ToPayload converts single value to payload. -func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { - return r.fallback.ToPayload(value) -} - -// FromPayloads converts to a list of values of different types. -// Useful for deserializing arguments of function invocations. -func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { - if payloads == nil { - return nil - } - - if len(valuePtrs) == 1 { - // input proxying - if input, ok := valuePtrs[0].(**commonpb.Payloads); ok { - *input = &commonpb.Payloads{} - (*input).Payloads = payloads.Payloads - return nil - } - } - - for i := 0; i < len(payloads.Payloads); i++ { - err := r.FromPayload(payloads.Payloads[i], valuePtrs[i]) - if err != nil { - return err - } - } - - return nil -} - -// FromPayload converts single value from payload. -func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { - return r.fallback.FromPayload(payload, valuePtr) -} - -// ToString converts payload object into human readable string. -func (r *DataConverter) ToString(input *commonpb.Payload) string { - return r.fallback.ToString(input) -} - -// ToStrings converts payloads object into human readable strings. -func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string { - return r.fallback.ToStrings(input) -} diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go deleted file mode 100644 index 6ce9fa0f..00000000 --- a/plugins/temporal/protocol/converter_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package protocol - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/converter" -) - -func Test_Passthough(t *testing.T) { - codec := NewDataConverter(converter.GetDefaultDataConverter()) - - value, err := codec.ToPayloads("test") - assert.NoError(t, err) - - out := &common.Payloads{} - - assert.Len(t, out.Payloads, 0) - assert.NoError(t, codec.FromPayloads(value, &out)) - - assert.Len(t, out.Payloads, 1) -} diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go deleted file mode 100644 index c554e28f..00000000 --- a/plugins/temporal/protocol/internal/protocol.pb.go +++ /dev/null @@ -1,167 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: protocol.proto - -package internal - -import ( - fmt "fmt" - math "math" - - proto "github.com/golang/protobuf/proto" - v11 "go.temporal.io/api/common/v1" - v1 "go.temporal.io/api/failure/v1" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type Frame struct { - Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Frame) Reset() { *m = Frame{} } -func (m *Frame) String() string { return proto.CompactTextString(m) } -func (*Frame) ProtoMessage() {} -func (*Frame) Descriptor() ([]byte, []int) { - return fileDescriptor_2bc2336598a3f7e0, []int{0} -} - -func (m *Frame) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Frame.Unmarshal(m, b) -} -func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Frame.Marshal(b, m, deterministic) -} -func (m *Frame) XXX_Merge(src proto.Message) { - xxx_messageInfo_Frame.Merge(m, src) -} -func (m *Frame) XXX_Size() int { - return xxx_messageInfo_Frame.Size(m) -} -func (m *Frame) XXX_DiscardUnknown() { - xxx_messageInfo_Frame.DiscardUnknown(m) -} - -var xxx_messageInfo_Frame proto.InternalMessageInfo - -func (m *Frame) GetMessages() []*Message { - if m != nil { - return m.Messages - } - return nil -} - -// Single communication message. -type Message struct { - Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - // command name (if any) - Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` - // command options in json format. - Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` - // error response. - Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"` - // invocation or result payloads. - Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Message) Reset() { *m = Message{} } -func (m *Message) String() string { return proto.CompactTextString(m) } -func (*Message) ProtoMessage() {} -func (*Message) Descriptor() ([]byte, []int) { - return fileDescriptor_2bc2336598a3f7e0, []int{1} -} - -func (m *Message) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Message.Unmarshal(m, b) -} -func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Message.Marshal(b, m, deterministic) -} -func (m *Message) XXX_Merge(src proto.Message) { - xxx_messageInfo_Message.Merge(m, src) -} -func (m *Message) XXX_Size() int { - return xxx_messageInfo_Message.Size(m) -} -func (m *Message) XXX_DiscardUnknown() { - xxx_messageInfo_Message.DiscardUnknown(m) -} - -var xxx_messageInfo_Message proto.InternalMessageInfo - -func (m *Message) GetId() uint64 { - if m != nil { - return m.Id - } - return 0 -} - -func (m *Message) GetCommand() string { - if m != nil { - return m.Command - } - return "" -} - -func (m *Message) GetOptions() []byte { - if m != nil { - return m.Options - } - return nil -} - -func (m *Message) GetFailure() *v1.Failure { - if m != nil { - return m.Failure - } - return nil -} - -func (m *Message) GetPayloads() *v11.Payloads { - if m != nil { - return m.Payloads - } - return nil -} - -func init() { - proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame") - proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message") -} - -func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) } - -var fileDescriptor_2bc2336598a3f7e0 = []byte{ - // 257 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31, - 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec, - 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92, - 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef, - 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c, - 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56, - 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3, - 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47, - 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b, - 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2, - 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf, - 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a, - 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4, - 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43, - 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef, - 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00, - 0x00, -} diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go deleted file mode 100644 index e7a77068..00000000 --- a/plugins/temporal/protocol/json_codec.go +++ /dev/null @@ -1,225 +0,0 @@ -package protocol - -import ( - "github.com/fatih/color" - j "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -var json = j.ConfigCompatibleWithStandardLibrary - -// JSONCodec can be used for debugging and log capturing reasons. -type JSONCodec struct { - // level enables verbose logging or all incoming and outcoming messages. - level DebugLevel - - // logger renders messages when debug enabled. - logger logger.Logger -} - -// jsonFrame contains message command in binary form. -type jsonFrame struct { - // ID contains ID of the command, response or error. - ID uint64 `json:"id"` - - // Command name. Optional. - Command string `json:"command,omitempty"` - - // Options to be unmarshalled to body (raw payload). - Options j.RawMessage `json:"options,omitempty"` - - // Failure associated with command id. - Failure []byte `json:"failure,omitempty"` - - // Payloads specific to the command or result. - Payloads []byte `json:"payloads,omitempty"` -} - -// NewJSONCodec creates new Json communication codec. -func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec { - return &JSONCodec{ - level: level, - logger: logger, - } -} - -// WithLogger creates new codes instance with attached logger. -func (c *JSONCodec) WithLogger(logger logger.Logger) Codec { - return &JSONCodec{ - level: c.level, - logger: logger, - } -} - -// GetName returns codec name. -func (c *JSONCodec) GetName() string { - return "json" -} - -// Execute exchanges commands with worker. -func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { - const op = errors.Op("json_codec_execute") - if len(msg) == 0 { - return nil, nil - } - - var response = make([]jsonFrame, 0, 5) - var result = make([]Message, 0, 5) - var err error - - frames := make([]jsonFrame, 0, len(msg)) - for _, m := range msg { - frame, err := c.packFrame(m) - if err != nil { - return nil, errors.E(op, err) - } - - frames = append(frames, frame) - } - - p := payload.Payload{} - - if ctx.IsEmpty() { - p.Context = []byte("null") - } - - p.Context, err = json.Marshal(ctx) - if err != nil { - return nil, errors.E(op, err) - } - - p.Body, err = json.Marshal(frames) - if err != nil { - return nil, errors.E(op, err) - } - - if c.level >= DebugNormal { - logMessage := string(p.Body) + " " + string(p.Context) - if c.level >= DebugHumanized { - logMessage = color.GreenString(logMessage) - } - - c.logger.Debug(logMessage) - } - - out, err := e.Exec(p) - if err != nil { - return nil, errors.E(op, err) - } - - if len(out.Body) == 0 { - // worker inactive or closed - return nil, nil - } - - if c.level >= DebugNormal { - logMessage := string(out.Body) - if c.level >= DebugHumanized { - logMessage = color.HiYellowString(logMessage) - } - - c.logger.Debug(logMessage, "receive", true) - } - - err = json.Unmarshal(out.Body, &response) - if err != nil { - return nil, errors.E(op, err) - } - - for _, f := range response { - msg, err := c.parseFrame(f) - if err != nil { - return nil, errors.E(op, err) - } - - result = append(result, msg) - } - - return result, nil -} - -func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) { - var ( - err error - frame jsonFrame - ) - - frame.ID = msg.ID - - if msg.Payloads != nil { - frame.Payloads, err = msg.Payloads.Marshal() - if err != nil { - return jsonFrame{}, err - } - } - - if msg.Failure != nil { - frame.Failure, err = msg.Failure.Marshal() - if err != nil { - return jsonFrame{}, err - } - } - - if msg.Command == nil { - return frame, nil - } - - frame.Command, err = commandName(msg.Command) - if err != nil { - return jsonFrame{}, err - } - - frame.Options, err = json.Marshal(msg.Command) - if err != nil { - return jsonFrame{}, err - } - - return frame, nil -} - -func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) { - var ( - err error - msg Message - ) - - msg.ID = frame.ID - - if frame.Payloads != nil { - msg.Payloads = &common.Payloads{} - - err = msg.Payloads.Unmarshal(frame.Payloads) - if err != nil { - return Message{}, err - } - } - - if frame.Failure != nil { - msg.Failure = &failure.Failure{} - - err = msg.Failure.Unmarshal(frame.Failure) - if err != nil { - return Message{}, err - } - } - - if frame.Command != "" { - cmd, err := initCommand(frame.Command) - if err != nil { - return Message{}, err - } - - err = json.Unmarshal(frame.Options, &cmd) - if err != nil { - return Message{}, err - } - - msg.Command = cmd - } - - return msg, nil -} diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go deleted file mode 100644 index d5e0f49d..00000000 --- a/plugins/temporal/protocol/message.go +++ /dev/null @@ -1,334 +0,0 @@ -package protocol - -import ( - "time" - - "github.com/spiral/errors" - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/activity" - bindings "go.temporal.io/sdk/internalbindings" - "go.temporal.io/sdk/workflow" -) - -const ( - getWorkerInfoCommand = "GetWorkerInfo" - - invokeActivityCommand = "InvokeActivity" - startWorkflowCommand = "StartWorkflow" - invokeSignalCommand = "InvokeSignal" - invokeQueryCommand = "InvokeQuery" - destroyWorkflowCommand = "DestroyWorkflow" - cancelWorkflowCommand = "CancelWorkflow" - getStackTraceCommand = "StackTrace" - - executeActivityCommand = "ExecuteActivity" - executeChildWorkflowCommand = "ExecuteChildWorkflow" - getChildWorkflowExecutionCommand = "GetChildWorkflowExecution" - - newTimerCommand = "NewTimer" - sideEffectCommand = "SideEffect" - getVersionCommand = "GetVersion" - completeWorkflowCommand = "CompleteWorkflow" - continueAsNewCommand = "ContinueAsNew" - - signalExternalWorkflowCommand = "SignalExternalWorkflow" - cancelExternalWorkflowCommand = "CancelExternalWorkflow" - - cancelCommand = "Cancel" - panicCommand = "Panic" -) - -// GetWorkerInfo reads worker information. -type GetWorkerInfo struct{} - -// InvokeActivity invokes activity. -type InvokeActivity struct { - // Name defines activity name. - Name string `json:"name"` - - // Info contains execution context. - Info activity.Info `json:"info"` - - // HeartbeatDetails indicates that the payload also contains last heartbeat details. - HeartbeatDetails int `json:"heartbeatDetails,omitempty"` -} - -// StartWorkflow sends worker command to start workflow. -type StartWorkflow struct { - // Info to define workflow context. - Info *workflow.Info `json:"info"` - - // LastCompletion contains offset of last completion results. - LastCompletion int `json:"lastCompletion,omitempty"` -} - -// InvokeSignal invokes signal with a set of arguments. -type InvokeSignal struct { - // RunID workflow run id. - RunID string `json:"runId"` - - // Name of the signal. - Name string `json:"name"` -} - -// InvokeQuery invokes query with a set of arguments. -type InvokeQuery struct { - // RunID workflow run id. - RunID string `json:"runId"` - // Name of the query. - Name string `json:"name"` -} - -// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal). -type CancelWorkflow struct { - // RunID workflow run id. - RunID string `json:"runId"` -} - -// DestroyWorkflow asks worker to offload workflow from memory. -type DestroyWorkflow struct { - // RunID workflow run id. - RunID string `json:"runId"` -} - -// GetStackTrace asks worker to offload workflow from memory. -type GetStackTrace struct { - // RunID workflow run id. - RunID string `json:"runId"` -} - -// ExecuteActivity command by workflow worker. -type ExecuteActivity struct { - // Name defines activity name. - Name string `json:"name"` - // Options to run activity. - Options bindings.ExecuteActivityOptions `json:"options,omitempty"` -} - -// ExecuteChildWorkflow executes child workflow. -type ExecuteChildWorkflow struct { - // Name defines workflow name. - Name string `json:"name"` - // Options to run activity. - Options bindings.WorkflowOptions `json:"options,omitempty"` -} - -// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow. -type GetChildWorkflowExecution struct { - // ID of child workflow command. - ID uint64 `json:"id"` -} - -// NewTimer starts new timer. -type NewTimer struct { - // Milliseconds defines timer duration. - Milliseconds int `json:"ms"` -} - -// SideEffect to be recorded into the history. -type SideEffect struct{} - -// GetVersion requests version marker. -type GetVersion struct { - ChangeID string `json:"changeID"` - MinSupported int `json:"minSupported"` - MaxSupported int `json:"maxSupported"` -} - -// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload. -type CompleteWorkflow struct{} - -// ContinueAsNew restarts workflow with new running instance. -type ContinueAsNew struct { - // Result defines workflow execution result. - Name string `json:"name"` - - // Options for continued as new workflow. - Options struct { - TaskQueueName string - WorkflowExecutionTimeout time.Duration - WorkflowRunTimeout time.Duration - WorkflowTaskTimeout time.Duration - } `json:"options"` -} - -// SignalExternalWorkflow sends signal to external workflow. -type SignalExternalWorkflow struct { - Namespace string `json:"namespace"` - WorkflowID string `json:"workflowID"` - RunID string `json:"runID"` - Signal string `json:"signal"` - ChildWorkflowOnly bool `json:"childWorkflowOnly"` -} - -// CancelExternalWorkflow canceller external workflow. -type CancelExternalWorkflow struct { - Namespace string `json:"namespace"` - WorkflowID string `json:"workflowID"` - RunID string `json:"runID"` -} - -// Cancel one or multiple internal promises (activities, local activities, timers, child workflows). -type Cancel struct { - // CommandIDs to be cancelled. - CommandIDs []uint64 `json:"ids"` -} - -// Panic triggers panic in workflow process. -type Panic struct { - // Message to include into the error. - Message string `json:"message"` -} - -// ActivityParams maps activity command to activity params. -func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams { - params := bindings.ExecuteActivityParams{ - ExecuteActivityOptions: cmd.Options, - ActivityType: bindings.ActivityType{Name: cmd.Name}, - Input: payloads, - } - - if params.TaskQueueName == "" { - params.TaskQueueName = env.WorkflowInfo().TaskQueueName - } - - return params -} - -// WorkflowParams maps workflow command to workflow params. -func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams { - params := bindings.ExecuteWorkflowParams{ - WorkflowOptions: cmd.Options, - WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, - Input: payloads, - } - - if params.TaskQueueName == "" { - params.TaskQueueName = env.WorkflowInfo().TaskQueueName - } - - return params -} - -// ToDuration converts timer command to time.Duration. -func (cmd NewTimer) ToDuration() time.Duration { - return time.Millisecond * time.Duration(cmd.Milliseconds) -} - -// returns command name (only for the commands sent to the worker) -func commandName(cmd interface{}) (string, error) { - const op = errors.Op("command_name") - switch cmd.(type) { - case GetWorkerInfo, *GetWorkerInfo: - return getWorkerInfoCommand, nil - case StartWorkflow, *StartWorkflow: - return startWorkflowCommand, nil - case InvokeSignal, *InvokeSignal: - return invokeSignalCommand, nil - case InvokeQuery, *InvokeQuery: - return invokeQueryCommand, nil - case DestroyWorkflow, *DestroyWorkflow: - return destroyWorkflowCommand, nil - case CancelWorkflow, *CancelWorkflow: - return cancelWorkflowCommand, nil - case GetStackTrace, *GetStackTrace: - return getStackTraceCommand, nil - case InvokeActivity, *InvokeActivity: - return invokeActivityCommand, nil - case ExecuteActivity, *ExecuteActivity: - return executeActivityCommand, nil - case ExecuteChildWorkflow, *ExecuteChildWorkflow: - return executeChildWorkflowCommand, nil - case GetChildWorkflowExecution, *GetChildWorkflowExecution: - return getChildWorkflowExecutionCommand, nil - case NewTimer, *NewTimer: - return newTimerCommand, nil - case GetVersion, *GetVersion: - return getVersionCommand, nil - case SideEffect, *SideEffect: - return sideEffectCommand, nil - case CompleteWorkflow, *CompleteWorkflow: - return completeWorkflowCommand, nil - case ContinueAsNew, *ContinueAsNew: - return continueAsNewCommand, nil - case SignalExternalWorkflow, *SignalExternalWorkflow: - return signalExternalWorkflowCommand, nil - case CancelExternalWorkflow, *CancelExternalWorkflow: - return cancelExternalWorkflowCommand, nil - case Cancel, *Cancel: - return cancelCommand, nil - case Panic, *Panic: - return panicCommand, nil - default: - return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd)) - } -} - -// reads command from binary payload -func initCommand(name string) (interface{}, error) { - const op = errors.Op("init_command") - switch name { - case getWorkerInfoCommand: - return &GetWorkerInfo{}, nil - - case startWorkflowCommand: - return &StartWorkflow{}, nil - - case invokeSignalCommand: - return &InvokeSignal{}, nil - - case invokeQueryCommand: - return &InvokeQuery{}, nil - - case destroyWorkflowCommand: - return &DestroyWorkflow{}, nil - - case cancelWorkflowCommand: - return &CancelWorkflow{}, nil - - case getStackTraceCommand: - return &GetStackTrace{}, nil - - case invokeActivityCommand: - return &InvokeActivity{}, nil - - case executeActivityCommand: - return &ExecuteActivity{}, nil - - case executeChildWorkflowCommand: - return &ExecuteChildWorkflow{}, nil - - case getChildWorkflowExecutionCommand: - return &GetChildWorkflowExecution{}, nil - - case newTimerCommand: - return &NewTimer{}, nil - - case getVersionCommand: - return &GetVersion{}, nil - - case sideEffectCommand: - return &SideEffect{}, nil - - case completeWorkflowCommand: - return &CompleteWorkflow{}, nil - - case continueAsNewCommand: - return &ContinueAsNew{}, nil - - case signalExternalWorkflowCommand: - return &SignalExternalWorkflow{}, nil - - case cancelExternalWorkflowCommand: - return &CancelExternalWorkflow{}, nil - - case cancelCommand: - return &Cancel{}, nil - - case panicCommand: - return &Panic{}, nil - - default: - return nil, errors.E(op, errors.Errorf("undefined command name: %s", name)) - } -} diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go deleted file mode 100644 index 607fe0fe..00000000 --- a/plugins/temporal/protocol/proto_codec.go +++ /dev/null @@ -1,145 +0,0 @@ -package protocol - -import ( - v1 "github.com/golang/protobuf/proto" //nolint:staticcheck - jsoniter "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal" - "google.golang.org/protobuf/proto" -) - -type ( - // ProtoCodec uses protobuf to exchange messages with underlying workers. - ProtoCodec struct { - } -) - -// NewProtoCodec creates new Proto communication codec. -func NewProtoCodec() Codec { - return &ProtoCodec{} -} - -// WithLogger creates new codes instance with attached logger. -func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec { - return &ProtoCodec{} -} - -// GetName returns codec name. -func (c *ProtoCodec) GetName() string { - return "protobuf" -} - -// Execute exchanges commands with worker. -func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { - if len(msg) == 0 { - return nil, nil - } - - var request = &internal.Frame{} - var response = &internal.Frame{} - var result = make([]Message, 0, 5) - var err error - - for _, m := range msg { - frame, err := c.packMessage(m) - if err != nil { - return nil, err - } - - request.Messages = append(request.Messages, frame) - } - - p := payload.Payload{} - - // context is always in json format - if ctx.IsEmpty() { - p.Context = []byte("null") - } - - p.Context, err = jsoniter.Marshal(ctx) - if err != nil { - return nil, errors.E(errors.Op("encodeContext"), err) - } - - p.Body, err = proto.Marshal(v1.MessageV2(request)) - if err != nil { - return nil, errors.E(errors.Op("encodePayload"), err) - } - - out, err := e.Exec(p) - if err != nil { - return nil, errors.E(errors.Op("execute"), err) - } - - if len(out.Body) == 0 { - // worker inactive or closed - return nil, nil - } - - err = proto.Unmarshal(out.Body, v1.MessageV2(response)) - if err != nil { - return nil, errors.E(errors.Op("parseResponse"), err) - } - - for _, f := range response.Messages { - msg, err := c.parseMessage(f) - if err != nil { - return nil, err - } - - result = append(result, msg) - } - - return result, nil -} - -func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) { - var err error - - frame := &internal.Message{ - Id: msg.ID, - Payloads: msg.Payloads, - Failure: msg.Failure, - } - - if msg.Command != nil { - frame.Command, err = commandName(msg.Command) - if err != nil { - return nil, err - } - - frame.Options, err = jsoniter.Marshal(msg.Command) - if err != nil { - return nil, err - } - } - - return frame, nil -} - -func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) { - const op = errors.Op("proto_codec_parse_message") - var err error - - msg := Message{ - ID: frame.Id, - Payloads: frame.Payloads, - Failure: frame.Failure, - } - - if frame.Command != "" { - msg.Command, err = initCommand(frame.Command) - if err != nil { - return Message{}, errors.E(op, err) - } - - err = jsoniter.Unmarshal(frame.Options, &msg.Command) - if err != nil { - return Message{}, errors.E(op, err) - } - } - - return msg, nil -} diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go deleted file mode 100644 index 53076fdf..00000000 --- a/plugins/temporal/protocol/protocol.go +++ /dev/null @@ -1,77 +0,0 @@ -package protocol - -import ( - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -const ( - // DebugNone disables all debug messages. - DebugNone = iota - - // DebugNormal renders all messages into console. - DebugNormal - - // DebugHumanized enables color highlights for messages. - DebugHumanized -) - -// Context provides worker information about currently. Context can be empty for server level commands. -type Context struct { - // TaskQueue associates message batch with the specific task queue in underlying worker. - TaskQueue string `json:"taskQueue,omitempty"` - - // TickTime associated current or historical time with message batch. - TickTime string `json:"tickTime,omitempty"` - - // Replay indicates that current message batch is historical. - Replay bool `json:"replay,omitempty"` -} - -// Message used to exchange the send commands and receive responses from underlying workers. -type Message struct { - // ID contains ID of the command, response or error. - ID uint64 `json:"id"` - - // Command of the message in unmarshalled form. Pointer. - Command interface{} `json:"command,omitempty"` - - // Failure associated with command id. - Failure *failure.Failure `json:"failure,omitempty"` - - // Payloads contains message specific payloads in binary format. - Payloads *commonpb.Payloads `json:"payloads,omitempty"` -} - -// Codec manages payload encoding and decoding while communication with underlying worker. -type Codec interface { - // WithLogger creates new codes instance with attached logger. - WithLogger(logger.Logger) Codec - - // GetName returns codec name. - GetName() string - - // Execute sends message to worker and waits for the response. - Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) -} - -// Endpoint provides the ability to send and receive messages. -type Endpoint interface { - // ExecWithContext allow to set ExecTTL - Exec(p payload.Payload) (payload.Payload, error) -} - -// DebugLevel configures debug level. -type DebugLevel int - -// IsEmpty only check if task queue set. -func (ctx Context) IsEmpty() bool { - return ctx.TaskQueue == "" -} - -// IsCommand returns true if message carries request. -func (msg Message) IsCommand() bool { - return msg.Command != nil -} diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go deleted file mode 100644 index 58a0ae66..00000000 --- a/plugins/temporal/protocol/worker_info.go +++ /dev/null @@ -1,72 +0,0 @@ -package protocol - -import ( - "github.com/spiral/errors" - "go.temporal.io/sdk/converter" - "go.temporal.io/sdk/worker" -) - -// WorkerInfo outlines information about every available worker and it's TaskQueues. - -// WorkerInfo lists available task queues, workflows and activities. -type WorkerInfo struct { - // TaskQueue assigned to the worker. - TaskQueue string `json:"taskQueue"` - - // Options describe worker options. - Options worker.Options `json:"options,omitempty"` - - // Workflows provided by the worker. - Workflows []WorkflowInfo - - // Activities provided by the worker. - Activities []ActivityInfo -} - -// WorkflowInfo describes single worker workflow. -type WorkflowInfo struct { - // Name of the workflow. - Name string `json:"name"` - - // Queries pre-defined for the workflow type. - Queries []string `json:"queries"` - - // Signals pre-defined for the workflow type. - Signals []string `json:"signals"` -} - -// ActivityInfo describes single worker activity. -type ActivityInfo struct { - // Name describes public activity name. - Name string `json:"name"` -} - -// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process). -func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) { - const op = errors.Op("fetch_worker_info") - - result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}}) - if err != nil { - return nil, errors.E(op, err) - } - - if len(result) != 1 { - return nil, errors.E(op, errors.Str("unable to read worker info")) - } - - if result[0].ID != 0 { - return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing")) - } - - var info []WorkerInfo - for i := range result[0].Payloads.Payloads { - wi := WorkerInfo{} - if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil { - return nil, errors.E(op, err) - } - - info = append(info, wi) - } - - return info, nil -} diff --git a/plugins/temporal/workflow/canceller.go b/plugins/temporal/workflow/canceller.go deleted file mode 100644 index 962c527f..00000000 --- a/plugins/temporal/workflow/canceller.go +++ /dev/null @@ -1,41 +0,0 @@ -package workflow - -import ( - "sync" -) - -type cancellable func() error - -type canceller struct { - ids sync.Map -} - -func (c *canceller) register(id uint64, cancel cancellable) { - c.ids.Store(id, cancel) -} - -func (c *canceller) discard(id uint64) { - c.ids.Delete(id) -} - -func (c *canceller) cancel(ids ...uint64) error { - var err error - for _, id := range ids { - cancel, ok := c.ids.Load(id) - if ok == false { - continue - } - - // TODO return when minimum supported version will be go 1.15 - // go1.14 don't have LoadAndDelete method - // It was introduced only in go1.15 - c.ids.Delete(id) - - err = cancel.(cancellable)() - if err != nil { - return err - } - } - - return nil -} diff --git a/plugins/temporal/workflow/canceller_test.go b/plugins/temporal/workflow/canceller_test.go deleted file mode 100644 index d6e846f8..00000000 --- a/plugins/temporal/workflow/canceller_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package workflow - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_CancellerNoListeners(t *testing.T) { - c := &canceller{} - - assert.NoError(t, c.cancel(1)) -} - -func Test_CancellerListenerError(t *testing.T) { - c := &canceller{} - c.register(1, func() error { - return errors.New("failed") - }) - - assert.Error(t, c.cancel(1)) -} - -func Test_CancellerListenerDiscarded(t *testing.T) { - c := &canceller{} - c.register(1, func() error { - return errors.New("failed") - }) - - c.discard(1) - assert.NoError(t, c.cancel(1)) -} diff --git a/plugins/temporal/workflow/id_registry.go b/plugins/temporal/workflow/id_registry.go deleted file mode 100644 index ac75cbda..00000000 --- a/plugins/temporal/workflow/id_registry.go +++ /dev/null @@ -1,51 +0,0 @@ -package workflow - -import ( - "sync" - - bindings "go.temporal.io/sdk/internalbindings" -) - -// used to gain access to child workflow ids after they become available via callback result. -type idRegistry struct { - mu sync.Mutex - ids map[uint64]entry - listeners map[uint64]listener -} - -type listener func(w bindings.WorkflowExecution, err error) - -type entry struct { - w bindings.WorkflowExecution - err error -} - -func newIDRegistry() *idRegistry { - return &idRegistry{ - ids: map[uint64]entry{}, - listeners: map[uint64]listener{}, - } -} - -func (c *idRegistry) listen(id uint64, cl listener) { - c.mu.Lock() - defer c.mu.Unlock() - - c.listeners[id] = cl - - if e, ok := c.ids[id]; ok { - cl(e.w, e.err) - } -} - -func (c *idRegistry) push(id uint64, w bindings.WorkflowExecution, err error) { - c.mu.Lock() - defer c.mu.Unlock() - - e := entry{w: w, err: err} - c.ids[id] = e - - if l, ok := c.listeners[id]; ok { - l(e.w, e.err) - } -} diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go deleted file mode 100644 index 8f4409d1..00000000 --- a/plugins/temporal/workflow/message_queue.go +++ /dev/null @@ -1,47 +0,0 @@ -package workflow - -import ( - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -type messageQueue struct { - seqID func() uint64 - queue []rrt.Message -} - -func newMessageQueue(sedID func() uint64) *messageQueue { - return &messageQueue{ - seqID: sedID, - queue: make([]rrt.Message, 0, 5), - } -} - -func (mq *messageQueue) flush() { - mq.queue = mq.queue[0:0] -} - -func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) { - msg := rrt.Message{ - ID: mq.seqID(), - Command: cmd, - Payloads: payloads, - } - - return msg.ID, msg -} - -func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 { - id, msg := mq.allocateMessage(cmd, payloads) - mq.queue = append(mq.queue, msg) - return id -} - -func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) { - mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads}) -} - -func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) { - mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure}) -} diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go deleted file mode 100644 index 1fcd409f..00000000 --- a/plugins/temporal/workflow/message_queue_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package workflow - -import ( - "sync/atomic" - "testing" - - "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - "github.com/stretchr/testify/assert" - "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -func Test_MessageQueueFlushError(t *testing.T) { - var index uint64 - mq := newMessageQueue(func() uint64 { - return atomic.AddUint64(&index, 1) - }) - - mq.pushError(1, &failure.Failure{}) - assert.Len(t, mq.queue, 1) - - mq.flush() - assert.Len(t, mq.queue, 0) - assert.Equal(t, uint64(0), index) -} - -func Test_MessageQueueFlushResponse(t *testing.T) { - var index uint64 - mq := newMessageQueue(func() uint64 { - return atomic.AddUint64(&index, 1) - }) - - mq.pushResponse(1, &common.Payloads{}) - assert.Len(t, mq.queue, 1) - - mq.flush() - assert.Len(t, mq.queue, 0) - assert.Equal(t, uint64(0), index) -} - -func Test_MessageQueueCommandID(t *testing.T) { - var index uint64 - mq := newMessageQueue(func() uint64 { - return atomic.AddUint64(&index, 1) - }) - - n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{}) - assert.Equal(t, n, index) - assert.Len(t, mq.queue, 1) - - mq.flush() - assert.Len(t, mq.queue, 0) -} diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go deleted file mode 100644 index 572d9a3b..00000000 --- a/plugins/temporal/workflow/plugin.go +++ /dev/null @@ -1,203 +0,0 @@ -package workflow - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/cenkalti/backoff/v4" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/client" -) - -const ( - // PluginName defines public service name. - PluginName = "workflows" - - // RRMode sets as RR_MODE env variable to let worker know about the mode to run. - RRMode = "temporal/workflow" -) - -// Plugin manages workflows and workers. -type Plugin struct { - temporal client.Temporal - events events.Handler - server server.Server - log logger.Logger - mu sync.Mutex - reset chan struct{} - pool workflowPool - closing int64 -} - -// Init workflow plugin. -func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { - p.temporal = temporal - p.server = server - p.events = events.NewEventsHandler() - p.log = log - p.reset = make(chan struct{}, 1) - - return nil -} - -// Serve starts workflow service. -func (p *Plugin) Serve() chan error { - const op = errors.Op("workflow_plugin_serve") - errCh := make(chan error, 1) - - pool, err := p.startPool() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - p.pool = pool - - go func() { - for { - select { - case <-p.reset: - if atomic.LoadInt64(&p.closing) == 1 { - return - } - - err := p.replacePool() - if err == nil { - continue - } - - bkoff := backoff.NewExponentialBackOff() - bkoff.InitialInterval = time.Second - - err = backoff.Retry(p.replacePool, bkoff) - if err != nil { - errCh <- errors.E(op, err) - } - } - } - }() - - return errCh -} - -// Stop workflow service. -func (p *Plugin) Stop() error { - const op = errors.Op("workflow_plugin_stop") - atomic.StoreInt64(&p.closing, 1) - - pool := p.getPool() - if pool != nil { - p.pool = nil - err := pool.Destroy(context.Background()) - if err != nil { - return errors.E(op, err) - } - return nil - } - - return nil -} - -// Name of the service. -func (p *Plugin) Name() string { - return PluginName -} - -// Workers returns list of available workflow workers. -func (p *Plugin) Workers() []worker.BaseProcess { - p.mu.Lock() - defer p.mu.Unlock() - return p.pool.Workers() -} - -// WorkflowNames returns list of all available workflows. -func (p *Plugin) WorkflowNames() []string { - return p.pool.WorkflowNames() -} - -// Reset resets underlying workflow pool with new copy. -func (p *Plugin) Reset() error { - p.reset <- struct{}{} - - return nil -} - -// AddListener adds event listeners to the service. -func (p *Plugin) poolListener(event interface{}) { - if ev, ok := event.(PoolEvent); ok { - if ev.Event == eventWorkerExit { - if ev.Caused != nil { - p.log.Error("Workflow pool error", "error", ev.Caused) - } - p.reset <- struct{}{} - } - } - - p.events.Push(event) -} - -// AddListener adds event listeners to the service. -func (p *Plugin) startPool() (workflowPool, error) { - const op = errors.Op("workflow_plugin_start_pool") - pool, err := newWorkflowPool( - p.temporal.GetCodec().WithLogger(p.log), - p.poolListener, - p.server, - ) - if err != nil { - return nil, errors.E(op, err) - } - - err = pool.Start(context.Background(), p.temporal) - if err != nil { - return nil, errors.E(op, err) - } - - p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames()) - - return pool, nil -} - -func (p *Plugin) replacePool() error { - p.mu.Lock() - const op = errors.Op("workflow_plugin_replace_pool") - defer p.mu.Unlock() - - if p.pool != nil { - err := p.pool.Destroy(context.Background()) - p.pool = nil - if err != nil { - p.log.Error( - "Unable to destroy expired workflow pool", - "error", - errors.E(op, err), - ) - return errors.E(op, err) - } - } - - pool, err := p.startPool() - if err != nil { - p.log.Error("Replace workflow pool failed", "error", err) - return errors.E(op, err) - } - - p.pool = pool - p.log.Debug("workflow pool successfully replaced") - - return nil -} - -// getPool returns currently pool. -func (p *Plugin) getPool() workflowPool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.pool -} diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go deleted file mode 100644 index 45e6885c..00000000 --- a/plugins/temporal/workflow/process.go +++ /dev/null @@ -1,436 +0,0 @@ -package workflow - -import ( - "strconv" - "sync/atomic" - "time" - - "github.com/spiral/errors" - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - commonpb "go.temporal.io/api/common/v1" - bindings "go.temporal.io/sdk/internalbindings" - "go.temporal.io/sdk/workflow" -) - -// wraps single workflow process -type workflowProcess struct { - codec rrt.Codec - pool workflowPool - env bindings.WorkflowEnvironment - header *commonpb.Header - mq *messageQueue - ids *idRegistry - seqID uint64 - runID string - pipeline []rrt.Message - callbacks []func() error - canceller *canceller - inLoop bool -} - -// Execute workflow, bootstraps process. -func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) { - wf.env = env - wf.header = header - wf.seqID = 0 - wf.runID = env.WorkflowInfo().WorkflowExecution.RunID - wf.canceller = &canceller{} - - // sequenceID shared for all worker workflows - wf.mq = newMessageQueue(wf.pool.SeqID) - wf.ids = newIDRegistry() - - env.RegisterCancelHandler(wf.handleCancel) - env.RegisterSignalHandler(wf.handleSignal) - env.RegisterQueryHandler(wf.handleQuery) - - var ( - lastCompletion = bindings.GetLastCompletionResult(env) - lastCompletionOffset = 0 - ) - - if lastCompletion != nil && len(lastCompletion.Payloads) != 0 { - if input == nil { - input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}} - } - - input.Payloads = append(input.Payloads, lastCompletion.Payloads...) - lastCompletionOffset = len(lastCompletion.Payloads) - } - - _ = wf.mq.pushCommand( - rrt.StartWorkflow{ - Info: env.WorkflowInfo(), - LastCompletion: lastCompletionOffset, - }, - input, - ) -} - -// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server. -func (wf *workflowProcess) OnWorkflowTaskStarted() { - wf.inLoop = true - defer func() { wf.inLoop = false }() - - var err error - for _, callback := range wf.callbacks { - err = callback() - if err != nil { - panic(err) - } - } - wf.callbacks = nil - - if err := wf.flushQueue(); err != nil { - panic(err) - } - - for len(wf.pipeline) > 0 { - msg := wf.pipeline[0] - wf.pipeline = wf.pipeline[1:] - - if msg.IsCommand() { - err = wf.handleMessage(msg) - } - - if err != nil { - panic(err) - } - } -} - -// StackTrace renders workflow stack trace. -func (wf *workflowProcess) StackTrace() string { - result, err := wf.runCommand( - rrt.GetStackTrace{ - RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, - }, - nil, - ) - - if err != nil { - return err.Error() - } - - var stacktrace string - err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace) - if err != nil { - return err.Error() - } - - return stacktrace -} - -// Close the workflow. -func (wf *workflowProcess) Close() { - // TODO: properly handle errors - // panic(err) - - _ = wf.mq.pushCommand( - rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, - nil, - ) - - _, _ = wf.discardQueue() -} - -// execution context. -func (wf *workflowProcess) getContext() rrt.Context { - return rrt.Context{ - TaskQueue: wf.env.WorkflowInfo().TaskQueueName, - TickTime: wf.env.Now().Format(time.RFC3339), - Replay: wf.env.IsReplaying(), - } -} - -// schedule cancel command -func (wf *workflowProcess) handleCancel() { - _ = wf.mq.pushCommand( - rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, - nil, - ) -} - -// schedule the signal processing -func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) { - _ = wf.mq.pushCommand( - rrt.InvokeSignal{ - RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, - Name: name, - }, - input, - ) -} - -// Handle query in blocking mode. -func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) { - result, err := wf.runCommand( - rrt.InvokeQuery{ - RunID: wf.runID, - Name: queryType, - }, - queryArgs, - ) - - if err != nil { - return nil, err - } - - if result.Failure != nil { - return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter()) - } - - return result.Payloads, nil -} - -// process incoming command -func (wf *workflowProcess) handleMessage(msg rrt.Message) error { - const op = errors.Op("handleMessage") - var err error - - var ( - id = msg.ID - cmd = msg.Command - payloads = msg.Payloads - ) - - switch cmd := cmd.(type) { - case *rrt.ExecuteActivity: - params := cmd.ActivityParams(wf.env, payloads) - activityID := wf.env.ExecuteActivity(params, wf.createCallback(id)) - - wf.canceller.register(id, func() error { - wf.env.RequestCancelActivity(activityID) - return nil - }) - - case *rrt.ExecuteChildWorkflow: - params := cmd.WorkflowParams(wf.env, payloads) - - // always use deterministic id - if params.WorkflowID == "" { - nextID := atomic.AddUint64(&wf.seqID, 1) - params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID)) - } - - wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) { - wf.ids.push(id, r, e) - }) - - wf.canceller.register(id, func() error { - wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID) - return nil - }) - - case *rrt.GetChildWorkflowExecution: - wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) { - cl := wf.createCallback(id) - - // TODO rewrite - if err != nil { - panic(err) - } - - p, err := wf.env.GetDataConverter().ToPayloads(w) - if err != nil { - panic(err) - } - - cl(p, err) - }) - - case *rrt.NewTimer: - timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id)) - wf.canceller.register(id, func() error { - if timerID != nil { - wf.env.RequestCancelTimer(*timerID) - } - return nil - }) - - case *rrt.GetVersion: - version := wf.env.GetVersion( - cmd.ChangeID, - workflow.Version(cmd.MinSupported), - workflow.Version(cmd.MaxSupported), - ) - - result, err := wf.env.GetDataConverter().ToPayloads(version) - if err != nil { - return errors.E(op, err) - } - - wf.mq.pushResponse(id, result) - err = wf.flushQueue() - if err != nil { - panic(err) - } - - case *rrt.SideEffect: - wf.env.SideEffect( - func() (*commonpb.Payloads, error) { - return payloads, nil - }, - wf.createContinuableCallback(id), - ) - - case *rrt.CompleteWorkflow: - result, _ := wf.env.GetDataConverter().ToPayloads("completed") - wf.mq.pushResponse(id, result) - - if msg.Failure == nil { - wf.env.Complete(payloads, nil) - } else { - wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter())) - } - - case *rrt.ContinueAsNew: - result, _ := wf.env.GetDataConverter().ToPayloads("completed") - wf.mq.pushResponse(id, result) - - wf.env.Complete(nil, &workflow.ContinueAsNewError{ - WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, - Input: payloads, - Header: wf.header, - TaskQueueName: cmd.Options.TaskQueueName, - WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout, - WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout, - WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout, - }) - - case *rrt.SignalExternalWorkflow: - wf.env.SignalExternalWorkflow( - cmd.Namespace, - cmd.WorkflowID, - cmd.RunID, - cmd.Signal, - payloads, - nil, - cmd.ChildWorkflowOnly, - wf.createCallback(id), - ) - - case *rrt.CancelExternalWorkflow: - wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id)) - - case *rrt.Cancel: - err = wf.canceller.cancel(cmd.CommandIDs...) - if err != nil { - return errors.E(op, err) - } - - result, _ := wf.env.GetDataConverter().ToPayloads("completed") - wf.mq.pushResponse(id, result) - - err = wf.flushQueue() - if err != nil { - panic(err) - } - - case *rrt.Panic: - panic(errors.E(cmd.Message)) - - default: - panic("undefined command") - } - - return nil -} - -func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler { - callback := func(result *commonpb.Payloads, err error) error { - wf.canceller.discard(id) - - if err != nil { - wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) - return nil - } - - // fetch original payload - wf.mq.pushResponse(id, result) - return nil - } - - return func(result *commonpb.Payloads, err error) { - // timer cancel callback can happen inside the loop - if wf.inLoop { - err := callback(result, err) - if err != nil { - panic(err) - } - - return - } - - wf.callbacks = append(wf.callbacks, func() error { - return callback(result, err) - }) - } -} - -// callback to be called inside the queue processing, adds new messages at the end of the queue -func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler { - callback := func(result *commonpb.Payloads, err error) { - wf.canceller.discard(id) - - if err != nil { - wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) - return - } - - wf.mq.pushResponse(id, result) - err = wf.flushQueue() - if err != nil { - panic(err) - } - } - - return func(result *commonpb.Payloads, err error) { - callback(result, err) - } -} - -// Exchange messages between host and worker processes and add new commands to the queue. -func (wf *workflowProcess) flushQueue() error { - const op = errors.Op("flush queue") - messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) - wf.mq.flush() - - if err != nil { - return errors.E(op, err) - } - - wf.pipeline = append(wf.pipeline, messages...) - - return nil -} - -// Exchange messages between host and worker processes without adding new commands to the queue. -func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) { - const op = errors.Op("discard queue") - messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) - wf.mq.flush() - - if err != nil { - return nil, errors.E(op, err) - } - - return messages, nil -} - -// Run single command and return single result. -func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) { - const op = errors.Op("workflow_process_runcommand") - _, msg := wf.mq.allocateMessage(cmd, payloads) - - result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg) - if err != nil { - return rrt.Message{}, errors.E(op, err) - } - - if len(result) != 1 { - return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response")) - } - - return result[0], nil -} diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go deleted file mode 100644 index b9ed46c8..00000000 --- a/plugins/temporal/workflow/workflow_pool.go +++ /dev/null @@ -1,190 +0,0 @@ -package workflow - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - rrWorker "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/client" - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - bindings "go.temporal.io/sdk/internalbindings" - "go.temporal.io/sdk/worker" - "go.temporal.io/sdk/workflow" -) - -const eventWorkerExit = 8390 - -// RR_MODE env variable key -const RR_MODE = "RR_MODE" //nolint - -// RR_CODEC env variable key -const RR_CODEC = "RR_CODEC" //nolint - -type workflowPool interface { - SeqID() uint64 - Exec(p payload.Payload) (payload.Payload, error) - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.BaseProcess - WorkflowNames() []string -} - -// PoolEvent triggered on workflow pool worker events. -type PoolEvent struct { - Event int - Context interface{} - Caused error -} - -// workflowPoolImpl manages workflowProcess executions between worker restarts. -type workflowPoolImpl struct { - codec rrt.Codec - seqID uint64 - workflows map[string]rrt.WorkflowInfo - tWorkers []worker.Worker - mu sync.Mutex - worker rrWorker.SyncWorker - active bool -} - -// newWorkflowPool creates new workflow pool. -func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) { - const op = errors.Op("new_workflow_pool") - w, err := factory.NewWorker( - context.Background(), - map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}, - listener, - ) - if err != nil { - return nil, errors.E(op, err) - } - - go func() { - err := w.Wait() - listener(PoolEvent{Event: eventWorkerExit, Caused: err}) - }() - - return &workflowPoolImpl{codec: codec, worker: rrWorker.From(w)}, nil -} - -// Start the pool in non blocking mode. -func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("workflow_pool_start") - pool.mu.Lock() - pool.active = true - pool.mu.Unlock() - - err := pool.initWorkers(ctx, temporal) - if err != nil { - return errors.E(op, err) - } - - for i := 0; i < len(pool.tWorkers); i++ { - err := pool.tWorkers[i].Start() - if err != nil { - return errors.E(op, err) - } - } - - return nil -} - -// Active. -func (pool *workflowPoolImpl) Active() bool { - return pool.active -} - -// Destroy stops all temporal workers and application worker. -func (pool *workflowPoolImpl) Destroy(ctx context.Context) error { - pool.mu.Lock() - defer pool.mu.Unlock() - const op = errors.Op("workflow_pool_destroy") - - pool.active = false - for i := 0; i < len(pool.tWorkers); i++ { - pool.tWorkers[i].Stop() - } - - worker.PurgeStickyWorkflowCache() - - err := pool.worker.Stop() - if err != nil { - return errors.E(op, err) - } - - return nil -} - -// NewWorkflowDefinition initiates new workflow process. -func (pool *workflowPoolImpl) NewWorkflowDefinition() bindings.WorkflowDefinition { - return &workflowProcess{ - codec: pool.codec, - pool: pool, - } -} - -// NewWorkflowDefinition initiates new workflow process. -func (pool *workflowPoolImpl) SeqID() uint64 { - return atomic.AddUint64(&pool.seqID, 1) -} - -// Exec set of commands in thread safe move. -func (pool *workflowPoolImpl) Exec(p payload.Payload) (payload.Payload, error) { - pool.mu.Lock() - defer pool.mu.Unlock() - - if !pool.active { - return payload.Payload{}, nil - } - - return pool.worker.Exec(p) -} - -func (pool *workflowPoolImpl) Workers() []rrWorker.BaseProcess { - return []rrWorker.BaseProcess{pool.worker} -} - -func (pool *workflowPoolImpl) WorkflowNames() []string { - names := make([]string, 0, len(pool.workflows)) - for name := range pool.workflows { - names = append(names, name) - } - - return names -} - -// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. -func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("workflow_pool_init_workers") - workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter()) - if err != nil { - return errors.E(op, err) - } - - pool.workflows = make(map[string]rrt.WorkflowInfo) - pool.tWorkers = make([]worker.Worker, 0) - - for _, info := range workerInfo { - w, err := temporal.CreateWorker(info.TaskQueue, info.Options) - if err != nil { - return errors.E(op, err, pool.Destroy(ctx)) - } - - pool.tWorkers = append(pool.tWorkers, w) - for _, workflowInfo := range info.Workflows { - w.RegisterWorkflowWithOptions(pool, workflow.RegisterOptions{ - Name: workflowInfo.Name, - DisableAlreadyRegisteredCheck: false, - }) - - pool.workflows[workflowInfo.Name] = workflowInfo - } - } - - return nil -} diff --git a/tests/composer.json b/tests/composer.json index 060f105e..0cf74581 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -6,10 +6,5 @@ "spiral/roadrunner": "^2.0", "spiral/roadrunner-http": "^2.0", "temporal/sdk": "dev-master" - }, - "autoload": { - "psr-4": { - "Temporal\\Tests\\": "temporal" - } } } diff --git a/tests/plugins/temporal/.rr.yaml b/tests/plugins/temporal/.rr.yaml deleted file mode 100644 index 04d0730d..00000000 --- a/tests/plugins/temporal/.rr.yaml +++ /dev/null @@ -1,22 +0,0 @@ -# Application configuration -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php worker.php" - -# Workflow and activity mesh service -temporal: - address: "localhost:7233" - activities: - num_workers: 4 - codec: json - debug_level: 2 - -logs: - mode: none - channels: - activities: - mode: none - #workflows: - #mode: none
\ No newline at end of file diff --git a/tests/plugins/temporal/cancel_test.go b/tests/plugins/temporal/cancel_test.go deleted file mode 100644 index 0fd3c126..00000000 --- a/tests/plugins/temporal/cancel_test.go +++ /dev/null @@ -1,291 +0,0 @@ -package tests - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.temporal.io/api/enums/v1" - "go.temporal.io/api/history/v1" - "go.temporal.io/sdk/client" -) - -func Test_SimpleWorkflowCancel(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleSignalledWorkflow") - assert.NoError(t, err) - - time.Sleep(time.Millisecond * 500) - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.Error(t, w.Get(context.Background(), &result)) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - assert.Equal(t, "Canceled", we.WorkflowExecutionInfo.Status.String()) -} - -func Test_CancellableWorkflowScope(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledScopeWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "yes", result) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_TIMER_CANCELED - }) - - s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED - }) -} - -func Test_CancelledWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "CANCELLED", result) -} - -func Test_CancelledWithCompensationWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledWithCompensationWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "yield", - "rollback", - "captured retry", - "captured promise on cancelled", - "START rollback", - "WAIT ROLLBACK", - "RESULT (ROLLBACK)", "DONE rollback", - "COMPLETE rollback", - "result: OK", - }, - trace, - ) -} - -func Test_CancelledNestedWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledNestedWorkflow", - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "CANCELLED", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "begin", - "first scope", - "second scope", - "close second scope", - "close first scope", - "second scope cancelled", - "first scope cancelled", - "close process", - }, - trace, - ) -} - -func Test_CancelledNSingleScopeWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledSingleScopeWorkflow", - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "start", - "in scope", - "on cancel", - "captured in scope", - "captured in process", - }, - trace, - ) -} - -func Test_CancelledMidflightWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelledMidflightWorkflow", - ) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "start", - "in scope", - "on cancel", - "done cancel", - }, - trace, - ) - - s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED - }) -} - -func Test_CancelSignalledChildWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "CancelSignalledChildWorkflow", - ) - assert.NoError(t, err) - - var result interface{} - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "cancelled ok", result) - - e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus") - assert.NoError(t, err) - - trace := make([]string, 0) - assert.NoError(t, e.Get(&trace)) - assert.Equal( - t, - []string{ - "start", - "child started", - "child signalled", - "scope cancelled", - "process done", - }, - trace, - ) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED - }) -} diff --git a/tests/plugins/temporal/child_test.go b/tests/plugins/temporal/child_test.go deleted file mode 100644 index 49521791..00000000 --- a/tests/plugins/temporal/child_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package tests - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "go.temporal.io/sdk/client" -) - -func Test_ExecuteChildWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "WithChildWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "Child: CHILD HELLO WORLD", result) -} - -func Test_ExecuteChildStubWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "WithChildStubWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "Child: CHILD HELLO WORLD", result) -} - -func Test_ExecuteChildStubWorkflow_02(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ChildStubWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result []string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, []string{"HELLO WORLD", "UNTYPED"}, result) -} - -func Test_SignalChildViaStubWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SignalChildViaStubWorkflow", - ) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, 8, result) -} diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go deleted file mode 100644 index 9ca4d018..00000000 --- a/tests/plugins/temporal/disaster_test.go +++ /dev/null @@ -1,114 +0,0 @@ -package tests - -import ( - "context" - "os" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.temporal.io/sdk/client" -) - -func Test_WorkerError_DisasterRecovery(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) - assert.NoError(t, err) - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "TimerWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Millisecond * 750) - - // must fully recover with new worker - assert.NoError(t, p.Kill()) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "hello world", result) -} - -func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - defer func() { - // always restore script - _ = os.Rename("worker.bak", "worker.php") - }() - - // Makes worker pool unable to recover for some time - _ = os.Rename("worker.php", "worker.bak") - - p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid())) - assert.NoError(t, err) - - // must fully recover with new worker - assert.NoError(t, p.Kill()) - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "TimerWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Millisecond * 750) - - // restore the script and recover activity pool - _ = os.Rename("worker.bak", "worker.php") - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "hello world", result) -} - -func Test_ActivityError_DisasterRecovery(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - defer func() { - // always restore script - _ = os.Rename("worker.bak", "worker.php") - }() - - // Makes worker pool unable to recover for some time - _ = os.Rename("worker.php", "worker.bak") - - // destroys all workers in activities - for _, wrk := range s.activities.Workers() { - assert.NoError(t, wrk.Kill()) - } - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - // activity can't complete at this moment - time.Sleep(time.Millisecond * 750) - - // restore the script and recover activity pool - _ = os.Rename("worker.bak", "worker.php") - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "HELLO WORLD", result) -} diff --git a/tests/plugins/temporal/hp_test.go b/tests/plugins/temporal/hp_test.go deleted file mode 100644 index bceac025..00000000 --- a/tests/plugins/temporal/hp_test.go +++ /dev/null @@ -1,408 +0,0 @@ -package tests - -import ( - "context" - "crypto/rand" - "crypto/sha512" - "fmt" - "testing" - "time" - - "go.temporal.io/api/common/v1" - - "github.com/fatih/color" - "github.com/stretchr/testify/assert" - "go.temporal.io/api/enums/v1" - "go.temporal.io/api/history/v1" - "go.temporal.io/sdk/client" -) - -func init() { - color.NoColor = false -} - -func Test_VerifyRegistration(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - assert.Contains(t, s.workflows.WorkflowNames(), "SimpleWorkflow") - - assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.echo") - assert.Contains(t, s.activities.ActivityNames(), "HeartBeatActivity.doSomething") - - assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.lower") -} - -func Test_ExecuteSimpleWorkflow_1(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "HELLO WORLD", result) -} - -type User struct { - Name string - Email string -} - -func Test_ExecuteSimpleDTOWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleDTOWorkflow", - User{ - Name: "Antony", - Email: "[email protected]", - }, - ) - assert.NoError(t, err) - - var result struct{ Message string } - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "Hello Antony <[email protected]>", result.Message) -} - -func Test_ExecuteSimpleWorkflowWithSequenceInBatch(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "WorkflowWithSequence", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) -} - -func Test_MultipleWorkflowsInSingleWorker(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - w2, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "TimerWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "HELLO WORLD", result) - - assert.NoError(t, w2.Get(context.Background(), &result)) - assert.Equal(t, "hello world", result) -} - -func Test_ExecuteWorkflowWithParallelScopes(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ParallelScopesWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "HELLO WORLD|Hello World|hello world", result) -} - -func Test_Timer(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - start := time.Now() - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "TimerWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "hello world", result) - assert.True(t, time.Since(start).Seconds() > 1) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_TIMER_STARTED - }) -} - -func Test_SideEffect(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SideEffectWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Contains(t, result, "hello world-") - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - return event.EventType == enums.EVENT_TYPE_MARKER_RECORDED - }) -} - -func Test_EmptyWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "EmptyWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, 42, result) -} - -func Test_PromiseChaining(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ChainedWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "result:hello world", result) -} - -func Test_ActivityHeartbeat(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleHeartbeatWorkflow", - 2, - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - assert.Len(t, we.PendingActivities, 1) - - act := we.PendingActivities[0] - - assert.Equal(t, `{"value":2}`, string(act.HeartbeatDetails.Payloads[0].Data)) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK", result) -} - -func Test_FailedActivityHeartbeat(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "FailedHeartbeatWorkflow", - 8, - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - assert.Len(t, we.PendingActivities, 1) - - act := we.PendingActivities[0] - - assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data)) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK!", result) -} - -func Test_BinaryPayload(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - rnd := make([]byte, 2500) - - _, err := rand.Read(rnd) - assert.NoError(t, err) - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "BinaryWorkflow", - rnd, - ) - assert.NoError(t, err) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - - assert.Equal(t, fmt.Sprintf("%x", sha512.Sum512(rnd)), result) -} - -func Test_ContinueAsNew(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ContinuableWorkflow", - 1, - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - - assert.Equal(t, "ContinuedAsNew", we.WorkflowExecutionInfo.Status.String()) - - time.Sleep(time.Second) - - // the result of the final workflow - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK6", result) -} - -func Test_ActivityStubWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ActivityStubWorkflow", - "hello world", - ) - assert.NoError(t, err) - - // the result of the final workflow - var result []string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, []string{ - "HELLO WORLD", - "invalid method call", - "UNTYPED", - }, result) -} - -func Test_ExecuteProtoWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "ProtoPayloadWorkflow", - ) - assert.NoError(t, err) - - var result common.WorkflowExecution - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "updated", result.RunId) - assert.Equal(t, "workflow id", result.WorkflowId) -} - -func Test_SagaWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SagaWorkflow", - ) - assert.NoError(t, err) - - var result string - assert.Error(t, w.Get(context.Background(), &result)) -} diff --git a/tests/plugins/temporal/query_test.go b/tests/plugins/temporal/query_test.go deleted file mode 100644 index 8b0caeee..00000000 --- a/tests/plugins/temporal/query_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package tests - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.temporal.io/sdk/client" -) - -func Test_ListQueries(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "QueryWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - time.Sleep(time.Millisecond * 500) - - v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "error", -1) - assert.Nil(t, v) - assert.Error(t, err) - - assert.Contains(t, err.Error(), "KnownQueryTypes=[get]") - - var r int - assert.NoError(t, w.Get(context.Background(), &r)) - assert.Equal(t, 0, r) -} - -func Test_GetQuery(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "QueryWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", 88) - assert.NoError(t, err) - time.Sleep(time.Millisecond * 500) - - v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "get", nil) - assert.NoError(t, err) - - var r int - assert.NoError(t, v.Get(&r)) - assert.Equal(t, 88, r) - - assert.NoError(t, w.Get(context.Background(), &r)) - assert.Equal(t, 88, r) -} diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go deleted file mode 100644 index c8d815c3..00000000 --- a/tests/plugins/temporal/server_test.go +++ /dev/null @@ -1,198 +0,0 @@ -package tests - -import ( - "context" - "testing" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/resetter" - "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/activity" - rrClient "github.com/spiral/roadrunner/v2/plugins/temporal/client" - "github.com/spiral/roadrunner/v2/plugins/temporal/workflow" - "go.temporal.io/api/enums/v1" - "go.temporal.io/api/history/v1" - temporalClient "go.temporal.io/sdk/client" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -type TestServer struct { - container endure.Container - temporal rrClient.Temporal - activities *activity.Plugin - workflows *workflow.Plugin -} - -type ConfigOption struct { - Name string - Value interface{} -} - -func NewOption(name string, value interface{}) ConfigOption { - return ConfigOption{Name: name, Value: value} -} - -func NewTestServer(opt ...ConfigOption) *TestServer { - e, err := endure.NewContainer(initLogger(), endure.RetryOnFail(false)) - if err != nil { - panic(err) - } - - t := &rrClient.Plugin{} - a := &activity.Plugin{} - w := &workflow.Plugin{} - - if err := e.Register(initConfig(opt...)); err != nil { - panic(err) - } - - if err := e.Register(&logger.ZapLogger{}); err != nil { - panic(err) - } - if err := e.Register(&resetter.Plugin{}); err != nil { - panic(err) - } - if err := e.Register(&informer.Plugin{}); err != nil { - panic(err) - } - if err := e.Register(&server.Plugin{}); err != nil { - panic(err) - } - if err := e.Register(&rpc.Plugin{}); err != nil { - panic(err) - } - - if err := e.Register(t); err != nil { - panic(err) - } - if err := e.Register(a); err != nil { - panic(err) - } - if err := e.Register(w); err != nil { - panic(err) - } - - if err := e.Init(); err != nil { - panic(err) - } - - errCh, err := e.Serve() - if err != nil { - panic(err) - } - - go func() { - err := <-errCh - er := e.Stop() - if er != nil { - panic(err) - } - }() - - return &TestServer{container: e, temporal: t, activities: a, workflows: w} -} - -func (s *TestServer) Client() temporalClient.Client { - return s.temporal.GetClient() -} - -func (s *TestServer) MustClose() { - err := s.container.Stop() - if err != nil { - panic(err) - } -} - -func initConfig(opt ...ConfigOption) config.Configurer { - cfg := &config.Viper{} - cfg.Path = ".rr.yaml" - cfg.Prefix = "rr" - - return cfg -} - -func initLogger() *zap.Logger { - cfg := zap.Config{ - Level: zap.NewAtomicLevelAt(zap.ErrorLevel), - Encoding: "console", - EncoderConfig: zapcore.EncoderConfig{ - MessageKey: "message", - LevelKey: "level", - TimeKey: "time", - CallerKey: "caller", - NameKey: "name", - StacktraceKey: "stack", - EncodeLevel: zapcore.CapitalLevelEncoder, - EncodeTime: zapcore.ISO8601TimeEncoder, - EncodeCaller: zapcore.ShortCallerEncoder, - }, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, - } - - l, err := cfg.Build(zap.AddCaller()) - if err != nil { - panic(err) - } - - return l -} - -func (s *TestServer) AssertContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { - i := s.Client().GetWorkflowHistory( - context.Background(), - w.GetID(), - w.GetRunID(), - false, - enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, - ) - - for { - if !i.HasNext() { - t.Error("no more events and no match found") - break - } - - e, err := i.Next() - if err != nil { - t.Error("unable to read history event") - break - } - - if assert(e) { - break - } - } -} - -func (s *TestServer) AssertNotContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) { - i := s.Client().GetWorkflowHistory( - context.Background(), - w.GetID(), - w.GetRunID(), - false, - enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, - ) - - for { - if !i.HasNext() { - break - } - - e, err := i.Next() - if err != nil { - t.Error("unable to read history event") - break - } - - if assert(e) { - t.Error("found unexpected event") - break - } - } -} diff --git a/tests/plugins/temporal/signal_test.go b/tests/plugins/temporal/signal_test.go deleted file mode 100644 index 51826287..00000000 --- a/tests/plugins/temporal/signal_test.go +++ /dev/null @@ -1,170 +0,0 @@ -package tests - -import ( - "context" - "testing" - "time" - - "github.com/pborman/uuid" - "github.com/stretchr/testify/assert" - "go.temporal.io/api/enums/v1" - "go.temporal.io/api/history/v1" - "go.temporal.io/sdk/client" -) - -func Test_SignalsWithoutSignals(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleSignalledWorkflow", - "Hello World", - ) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, 0, result) -} - -func Test_SendSignalDuringTimer(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().SignalWithStartWorkflow( - context.Background(), - "signalled-"+uuid.New(), - "add", - 10, - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleSignalledWorkflow", - ) - assert.NoError(t, err) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, 9, result) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { - attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) - return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" - } - - return false - }) -} - -func Test_SendSignalBeforeCompletingWorkflow(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "SimpleSignalledWorkflowWithSleep", - ) - assert.NoError(t, err) - - // should be around sleep(1) call - time.Sleep(time.Second + time.Millisecond*200) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, -1, result) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { - attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) - return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" - } - - return false - }) -} - -func Test_RuntimeSignal(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().SignalWithStartWorkflow( - context.Background(), - "signalled-"+uuid.New(), - "add", - -1, - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "RuntimeSignalWorkflow", - ) - assert.NoError(t, err) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, -1, result) - - s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool { - if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { - attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes) - return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add" - } - - return false - }) -} - -func Test_SignalSteps(t *testing.T) { - s := NewTestServer() - defer s.MustClose() - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "WorkflowWithSignalledSteps", - ) - assert.NoError(t, err) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "begin", true) - assert.NoError(t, err) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next1", true) - assert.NoError(t, err) - - v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) - assert.NoError(t, err) - - var r int - assert.NoError(t, v.Get(&r)) - assert.Equal(t, 2, r) - - err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next2", true) - assert.NoError(t, err) - - v, err = s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil) - assert.NoError(t, err) - - assert.NoError(t, v.Get(&r)) - assert.Equal(t, 3, r) - - var result int - assert.NoError(t, w.Get(context.Background(), &result)) - - // 3 ticks - assert.Equal(t, 3, result) -} diff --git a/tests/plugins/temporal/worker.php b/tests/plugins/temporal/worker.php deleted file mode 100644 index 0d0263e7..00000000 --- a/tests/plugins/temporal/worker.php +++ /dev/null @@ -1,33 +0,0 @@ -<?php - -declare(strict_types=1); - -require __DIR__ . '/../../vendor/autoload.php'; - -/** - * @param string $dir - * @return array<string> - */ -$getClasses = static function (string $dir): iterable { - $files = glob($dir . '/*.php'); - - foreach ($files as $file) { - yield substr(basename($file), 0, -4); - } -}; - -$factory = \Temporal\WorkerFactory::create(); - -$worker = $factory->newWorker('default'); - -// register all workflows -foreach ($getClasses(__DIR__ . '/../../temporal/Workflow') as $name) { - $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name); -} - -// register all activity -foreach ($getClasses(__DIR__ . '/../../temporal/Activity') as $name) { - $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name); -} - -$factory->run(); diff --git a/tests/temporal/Activity/HeartBeatActivity.php b/tests/temporal/Activity/HeartBeatActivity.php deleted file mode 100644 index acf4a451..00000000 --- a/tests/temporal/Activity/HeartBeatActivity.php +++ /dev/null @@ -1,58 +0,0 @@ -<?php - -namespace Temporal\Tests\Activity; - -use Temporal\Activity; -use Temporal\Activity\ActivityInterface; -use Temporal\Activity\ActivityMethod; -use Temporal\Roadrunner\Internal\Error; - -#[ActivityInterface(prefix: "HeartBeatActivity.")] -class HeartBeatActivity -{ - #[ActivityMethod] - public function doSomething( - int $value - ): string { - Activity::heartbeat(['value' => $value]); - sleep($value); - return 'OK'; - } - - #[ActivityMethod] - public function slow( - string $value - ): string { - for ($i = 0; $i < 5; $i++) { - Activity::heartbeat(['value' => $i]); - sleep(1); - } - - return 'OK'; - } - - #[ActivityMethod] - public function something( - string $value - ): string { - Activity::heartbeat(['value' => $value]); - sleep($value); - return 'OK'; - } - - #[ActivityMethod] - public function failedActivity( - int $value - ): string { - Activity::heartbeat(['value' => $value]); - if (Activity::getInfo()->attempt === 1) { - throw new \Error("failed"); - } - - if (!is_array(Activity::getHeartbeatDetails())) { - throw new \Error("no heartbeat details"); - } - - return 'OK!'; - } -}
\ No newline at end of file diff --git a/tests/temporal/Activity/SimpleActivity.php b/tests/temporal/Activity/SimpleActivity.php deleted file mode 100644 index 576b126e..00000000 --- a/tests/temporal/Activity/SimpleActivity.php +++ /dev/null @@ -1,63 +0,0 @@ -<?php - -namespace Temporal\Tests\Activity; - -use Temporal\Activity\ActivityInterface; -use Temporal\Activity\ActivityMethod; -use Temporal\Api\Common\V1\WorkflowExecution; -use Temporal\DataConverter\Bytes; -use Temporal\Tests\DTO\Message; -use Temporal\Tests\DTO\User; - -#[ActivityInterface(prefix: "SimpleActivity.")] -class SimpleActivity -{ - #[ActivityMethod] - public function echo( - string $input - ): string { - return strtoupper($input); - } - - #[ActivityMethod] - public function lower( - string $input - ): string { - return strtolower($input); - } - - #[ActivityMethod] - public function greet( - User $user - ): Message { - return new Message(sprintf("Hello %s <%s>", $user->name, $user->email)); - } - - #[ActivityMethod] - public function slow( - string $input - ): string { - sleep(2); - - return strtolower($input); - } - - #[ActivityMethod] - public function sha512( - Bytes $input - ): string { - return hash("sha512", ($input->getData())); - } - - public function updateRunID(WorkflowExecution $e): WorkflowExecution - { - $e->setRunId('updated'); - return $e; - } - - #[ActivityMethod] - public function fail() - { - throw new \Error("failed activity"); - } -}
\ No newline at end of file diff --git a/tests/temporal/Client/StartNewWorkflow.php b/tests/temporal/Client/StartNewWorkflow.php deleted file mode 100644 index 67bc1d01..00000000 --- a/tests/temporal/Client/StartNewWorkflow.php +++ /dev/null @@ -1,23 +0,0 @@ -<?php - - -namespace Temporal\Tests\Client; - -use Temporal\Client; -use Temporal\Tests\Workflow\SimpleDTOWorkflow; - -use function Symfony\Component\String\s; - -class StartNewWorkflow -{ - private $stub; - - public function __construct(Client\ClientInterface $client) - { - $this->stub = $client->newWorkflowStub(SimpleDTOWorkflow::class); - } - - public function __invoke() - { - } -} diff --git a/tests/temporal/DTO/Message.php b/tests/temporal/DTO/Message.php deleted file mode 100644 index 61703fe8..00000000 --- a/tests/temporal/DTO/Message.php +++ /dev/null @@ -1,14 +0,0 @@ -<?php - - -namespace Temporal\Tests\DTO; - -class Message -{ - public string $message; - - public function __construct(string $message) - { - $this->message = $message; - } -}
\ No newline at end of file diff --git a/tests/temporal/DTO/User.php b/tests/temporal/DTO/User.php deleted file mode 100644 index cefea137..00000000 --- a/tests/temporal/DTO/User.php +++ /dev/null @@ -1,15 +0,0 @@ -<?php - - -namespace Temporal\Tests\DTO; - -use Temporal\Internal\Marshaller\Meta\Marshal; - -class User -{ - #[Marshal(name: "Name")] - public string $name; - - #[Marshal(name: "Email")] - public string $email; -}
\ No newline at end of file diff --git a/tests/temporal/Workflow/ActivityStubWorkflow.php b/tests/temporal/Workflow/ActivityStubWorkflow.php deleted file mode 100644 index 58dcdafb..00000000 --- a/tests/temporal/Workflow/ActivityStubWorkflow.php +++ /dev/null @@ -1,39 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class ActivityStubWorkflow -{ - #[WorkflowMethod(name: 'ActivityStubWorkflow')] - public function handler( - string $input - ) { - // typed stub - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - $result = []; - $result[] = yield $simple->echo($input); - - try { - $simple->undefined($input); - } catch (\BadMethodCallException $e) { - $result[] = 'invalid method call'; - } - - // untyped stub - $untyped = Workflow::newUntypedActivityStub(ActivityOptions::new()->withStartToCloseTimeout(1)); - - $result[] = yield $untyped->execute('SimpleActivity.echo', ['untyped']); - - return $result; - } -} diff --git a/tests/temporal/Workflow/AggregatedWorkflow.php b/tests/temporal/Workflow/AggregatedWorkflow.php deleted file mode 100644 index 3299179e..00000000 --- a/tests/temporal/Workflow/AggregatedWorkflow.php +++ /dev/null @@ -1,30 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow; -use Temporal\Workflow\SignalMethod; -use Temporal\Workflow\WorkflowInterface; -use Temporal\Workflow\WorkflowMethod; - -#[WorkflowInterface] -class AggregatedWorkflow -{ - private array $values = []; - - #[SignalMethod] - public function addValue( - string $value - ) { - $this->values[] = $value; - } - - #[WorkflowMethod(name: 'AggregatedWorkflow')] - public function run( - int $count - ) { - yield Workflow::await(fn() => count($this->values) === $count); - - return $this->values; - } -} diff --git a/tests/temporal/Workflow/AsyncActivityWorkflow.php b/tests/temporal/Workflow/AsyncActivityWorkflow.php deleted file mode 100644 index 79e45dfb..00000000 --- a/tests/temporal/Workflow/AsyncActivityWorkflow.php +++ /dev/null @@ -1,28 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityCancellationType; -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class AsyncActivityWorkflow -{ - #[WorkflowMethod(name: 'AsyncActivityWorkflow')] - public function handler() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new() - ->withStartToCloseTimeout(20) - ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED) - ); - - return yield $simple->external(); - } -} diff --git a/tests/temporal/Workflow/BinaryWorkflow.php b/tests/temporal/Workflow/BinaryWorkflow.php deleted file mode 100644 index ed1952ad..00000000 --- a/tests/temporal/Workflow/BinaryWorkflow.php +++ /dev/null @@ -1,21 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\DataConverter\Bytes; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class BinaryWorkflow -{ - #[WorkflowMethod(name: 'BinaryWorkflow')] - public function handler( - Bytes $input - ): iterable { - $opts = ActivityOptions::new()->withStartToCloseTimeout(5); - - return yield Workflow::executeActivity('SimpleActivity.sha512', [$input], $opts); - } -} diff --git a/tests/temporal/Workflow/CancelSignalledChildWorkflow.php b/tests/temporal/Workflow/CancelSignalledChildWorkflow.php deleted file mode 100644 index e2e43efa..00000000 --- a/tests/temporal/Workflow/CancelSignalledChildWorkflow.php +++ /dev/null @@ -1,57 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use React\Promise\Deferred; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class CancelSignalledChildWorkflow -{ - private array $status = []; - - #[Workflow\QueryMethod(name: 'getStatus')] - public function getStatus(): array - { - return $this->status; - } - - #[WorkflowMethod(name: 'CancelSignalledChildWorkflow')] - public function handler() - { - // typed stub - $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class); - - $waitSignalled = new Deferred(); - - $this->status[] = 'start'; - - // start execution - $scope = Workflow::newCancellationScope( - function () use ($simple, $waitSignalled) { - $call = $simple->handler(); - $this->status[] = 'child started'; - - yield $simple->add(8); - $this->status[] = 'child signalled'; - $waitSignalled->resolve(); - - return yield $call; - } - ); - - // only cancel scope when signal dispatched - yield $waitSignalled; - $scope->cancel(); - $this->status[] = 'scope cancelled'; - - try { - return yield $scope; - } catch (\Throwable $e) { - $this->status[] = 'process done'; - - return 'cancelled ok'; - } - } -} diff --git a/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php b/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php deleted file mode 100644 index 6b463192..00000000 --- a/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php +++ /dev/null @@ -1,29 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityCancellationType; -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\HeartBeatActivity; - -#[Workflow\WorkflowInterface] -class CanceledHeartbeatWorkflow -{ - #[WorkflowMethod(name: 'CanceledHeartbeatWorkflow')] - public function handler(): iterable - { - $act = Workflow::newActivityStub( - HeartBeatActivity::class, - ActivityOptions::new() - ->withStartToCloseTimeout(50) - ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED) - ->withHeartbeatTimeout(1) - ); - - return yield $act->slow('test'); - } -} diff --git a/tests/temporal/Workflow/CancelledMidflightWorkflow.php b/tests/temporal/Workflow/CancelledMidflightWorkflow.php deleted file mode 100644 index ea799ce1..00000000 --- a/tests/temporal/Workflow/CancelledMidflightWorkflow.php +++ /dev/null @@ -1,47 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class CancelledMidflightWorkflow -{ - private array $status = []; - - #[Workflow\QueryMethod(name: 'getStatus')] - public function getStatus(): array - { - return $this->status; - } - - #[WorkflowMethod(name: 'CancelledMidflightWorkflow')] - public function handler() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - $this->status[] = 'start'; - - $scope = Workflow::newCancellationScope( - function () use ($simple) { - $this->status[] = 'in scope'; - $simple->slow('1'); - } - )->onCancel( - function () { - $this->status[] = 'on cancel'; - } - ); - - $scope->cancel(); - $this->status[] = 'done cancel'; - - return 'OK'; - } -} diff --git a/tests/temporal/Workflow/CancelledNestedWorkflow.php b/tests/temporal/Workflow/CancelledNestedWorkflow.php deleted file mode 100644 index 0c82f761..00000000 --- a/tests/temporal/Workflow/CancelledNestedWorkflow.php +++ /dev/null @@ -1,72 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Exception\Failure\CanceledFailure; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class CancelledNestedWorkflow -{ - private array $status = []; - - #[Workflow\QueryMethod(name: 'getStatus')] - public function getStatus(): array - { - return $this->status; - } - - #[WorkflowMethod(name: 'CancelledNestedWorkflow')] - public function handler() - { - $this->status[] = 'begin'; - try { - yield Workflow::newCancellationScope( - function () { - $this->status[] = 'first scope'; - - $scope = Workflow::newCancellationScope( - function () { - $this->status[] = 'second scope'; - - try { - yield Workflow::timer(2); - } catch (CanceledFailure $e) { - $this->status[] = 'second scope cancelled'; - throw $e; - } - - $this->status[] = 'second scope done'; - } - )->onCancel( - function () { - $this->status[] = 'close second scope'; - } - ); - - try { - yield Workflow::timer(1); - } catch (CanceledFailure $e) { - $this->status[] = 'first scope cancelled'; - throw $e; - } - - $this->status[] = 'first scope done'; - - yield $scope; - } - )->onCancel( - function () { - $this->status[] = 'close first scope'; - } - ); - } catch (CanceledFailure $e) { - $this->status[] = 'close process'; - - return 'CANCELLED'; - } - - return 'OK'; - } -} diff --git a/tests/temporal/Workflow/CancelledScopeWorkflow.php b/tests/temporal/Workflow/CancelledScopeWorkflow.php deleted file mode 100644 index 50e0992f..00000000 --- a/tests/temporal/Workflow/CancelledScopeWorkflow.php +++ /dev/null @@ -1,39 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class CancelledScopeWorkflow -{ - #[WorkflowMethod(name: 'CancelledScopeWorkflow')] - public function handler() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - $cancelled = 'not'; - - $scope = Workflow::newCancellationScope( - function () use ($simple) { - yield Workflow::timer(2); - yield $simple->slow('hello'); - } - )->onCancel( - function () use (&$cancelled) { - $cancelled = 'yes'; - } - ); - - yield Workflow::timer(1); - $scope->cancel(); - - return $cancelled; - } -} diff --git a/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php b/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php deleted file mode 100644 index 5fe8d3d8..00000000 --- a/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php +++ /dev/null @@ -1,55 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Exception\Failure\CanceledFailure; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class CancelledSingleScopeWorkflow -{ - private array $status = []; - - #[Workflow\QueryMethod(name: 'getStatus')] - public function getStatus(): array - { - return $this->status; - } - - #[WorkflowMethod(name: 'CancelledSingleScopeWorkflow')] - public function handler() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new() - ->withStartToCloseTimeout(5) - ); - - $this->status[] = 'start'; - try { - yield Workflow::newCancellationScope( - function () use ($simple) { - try { - $this->status[] = 'in scope'; - yield $simple->slow('1'); - } catch (CanceledFailure $e) { - // after process is complete, do not use for business logic - $this->status[] = 'captured in scope'; - throw $e; - } - } - )->onCancel( - function () { - $this->status[] = 'on cancel'; - } - ); - } catch (CanceledFailure $e) { - $this->status[] = 'captured in process'; - } - - return 'OK'; - } -} diff --git a/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php b/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php deleted file mode 100644 index 2074aac1..00000000 --- a/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php +++ /dev/null @@ -1,79 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Exception\Failure\CanceledFailure; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class CancelledWithCompensationWorkflow -{ - private array $status = []; - - #[Workflow\QueryMethod(name: 'getStatus')] - public function getStatus(): array - { - return $this->status; - } - - #[WorkflowMethod(name: 'CancelledWithCompensationWorkflow')] - public function handler() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - // waits for 2 seconds - $slow = $simple->slow('DOING SLOW ACTIVITY'); - - try { - $this->status[] = 'yield'; - $result = yield $slow; - } catch (CanceledFailure $e) { - $this->status[] = 'rollback'; - - try { - // must fail again - $result = yield $slow; - } catch (CanceledFailure $e) { - $this->status[] = 'captured retry'; - } - - try { - // fail since on cancelled context - $result = yield $simple->echo('echo must fail'); - } catch (CanceledFailure $e) { - $this->status[] = 'captured promise on cancelled'; - } - - $scope = Workflow::newDetachedCancellationScope( - function () use ($simple) { - $this->status[] = 'START rollback'; - - $second = yield $simple->echo('rollback'); - - $this->status[] = sprintf("RESULT (%s)", $second); - - if ($second !== 'ROLLBACK') { - $this->status[] = 'FAIL rollback'; - return 'failed to compensate ' . $second; - } - $this->status[] = 'DONE rollback'; - - return 'OK'; - } - ); - - $this->status[] = 'WAIT ROLLBACK'; - $result = yield $scope; - $this->status[] = 'COMPLETE rollback'; - } - - $this->status[] = 'result: ' . $result; - return $result; - } -} diff --git a/tests/temporal/Workflow/CancelledWorkflow.php b/tests/temporal/Workflow/CancelledWorkflow.php deleted file mode 100644 index be9f7542..00000000 --- a/tests/temporal/Workflow/CancelledWorkflow.php +++ /dev/null @@ -1,31 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Exception\Failure\CanceledFailure; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class CancelledWorkflow -{ - #[WorkflowMethod(name: 'CancelledWorkflow')] - public function handler() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - // waits for 2 seconds - $slow = $simple->slow('DOING SLOW ACTIVITY'); - - try { - return yield $slow; - } catch (CanceledFailure $e) { - return "CANCELLED"; - } - } -} diff --git a/tests/temporal/Workflow/ChainedWorkflow.php b/tests/temporal/Workflow/ChainedWorkflow.php deleted file mode 100644 index ba9c8f96..00000000 --- a/tests/temporal/Workflow/ChainedWorkflow.php +++ /dev/null @@ -1,31 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class ChainedWorkflow -{ - #[WorkflowMethod(name: 'ChainedWorkflow')] - public function handler(string $input): iterable - { - $opts = ActivityOptions::new()->withStartToCloseTimeout(5); - - return yield Workflow::executeActivity( - 'SimpleActivity.echo', - [$input], - $opts - )->then(function ($result) use ($opts) { - return Workflow::executeActivity( - 'SimpleActivity.lower', - ['Result:' . $result], - $opts - ); - }); - } -} diff --git a/tests/temporal/Workflow/ChildStubWorkflow.php b/tests/temporal/Workflow/ChildStubWorkflow.php deleted file mode 100644 index 608962c2..00000000 --- a/tests/temporal/Workflow/ChildStubWorkflow.php +++ /dev/null @@ -1,30 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class ChildStubWorkflow -{ - #[WorkflowMethod(name: 'ChildStubWorkflow')] - public function handler( - string $input - ) { - // typed stub - $simple = Workflow::newChildWorkflowStub(SimpleWorkflow::class); - - $result = []; - $result[] = yield $simple->handler($input); - - // untyped - $untyped = Workflow::newUntypedChildWorkflowStub('SimpleWorkflow'); - $result[] = yield $untyped->execute(['untyped']); - - $execution = yield $untyped->getExecution(); - assert($execution instanceof Workflow\WorkflowExecution); - - return $result; - } -} diff --git a/tests/temporal/Workflow/ComplexExceptionalWorkflow.php b/tests/temporal/Workflow/ComplexExceptionalWorkflow.php deleted file mode 100644 index bf65ccb2..00000000 --- a/tests/temporal/Workflow/ComplexExceptionalWorkflow.php +++ /dev/null @@ -1,26 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Common\RetryOptions; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class ComplexExceptionalWorkflow -{ - #[WorkflowMethod(name: 'ComplexExceptionalWorkflow')] - public function handler() - { - $child = Workflow::newChildWorkflowStub( - ExceptionalActivityWorkflow::class, - Workflow\ChildWorkflowOptions::new()->withRetryOptions( - (new RetryOptions())->withMaximumAttempts(1) - ) - ); - - return yield $child->handler(); - } -} diff --git a/tests/temporal/Workflow/ContinuableWorkflow.php b/tests/temporal/Workflow/ContinuableWorkflow.php deleted file mode 100644 index 78411414..00000000 --- a/tests/temporal/Workflow/ContinuableWorkflow.php +++ /dev/null @@ -1,38 +0,0 @@ -<?php - - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class ContinuableWorkflow -{ - #[WorkflowMethod(name: 'ContinuableWorkflow')] - public function handler( - int $generation - ) { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - if ($generation > 5) { - // complete - return "OK" . $generation; - } - - if ($generation !== 1) { - assert(!empty(Workflow::getInfo()->continuedExecutionRunId)); - } - - for ($i = 0; $i < $generation; $i++) { - yield $simple->echo((string)$generation); - } - - return Workflow::continueAsNew('ContinuableWorkflow', [++$generation]); - } -} diff --git a/tests/temporal/Workflow/EmptyWorkflow.php b/tests/temporal/Workflow/EmptyWorkflow.php deleted file mode 100644 index 57fb5e65..00000000 --- a/tests/temporal/Workflow/EmptyWorkflow.php +++ /dev/null @@ -1,16 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow\WorkflowMethod; -use Temporal\Workflow; - -#[Workflow\WorkflowInterface] -class EmptyWorkflow -{ - #[WorkflowMethod] - public function handler() - { - return 42; - } -} diff --git a/tests/temporal/Workflow/ExceptionalActivityWorkflow.php b/tests/temporal/Workflow/ExceptionalActivityWorkflow.php deleted file mode 100644 index e0ed0005..00000000 --- a/tests/temporal/Workflow/ExceptionalActivityWorkflow.php +++ /dev/null @@ -1,25 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Common\RetryOptions; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class ExceptionalActivityWorkflow -{ - #[WorkflowMethod(name: 'ExceptionalActivityWorkflow')] - public function handler() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ->withRetryOptions((new RetryOptions())->withMaximumAttempts(1)) - ); - - return yield $simple->fail(); - } -} diff --git a/tests/temporal/Workflow/ExceptionalWorkflow.php b/tests/temporal/Workflow/ExceptionalWorkflow.php deleted file mode 100644 index 9a3e907f..00000000 --- a/tests/temporal/Workflow/ExceptionalWorkflow.php +++ /dev/null @@ -1,18 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class ExceptionalWorkflow -{ - #[WorkflowMethod(name: 'ExceptionalWorkflow')] - public function handler() - { - throw new \RuntimeException("workflow error"); - } -} diff --git a/tests/temporal/Workflow/FailedHeartbeatWorkflow.php b/tests/temporal/Workflow/FailedHeartbeatWorkflow.php deleted file mode 100644 index e857f100..00000000 --- a/tests/temporal/Workflow/FailedHeartbeatWorkflow.php +++ /dev/null @@ -1,30 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Common\RetryOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\HeartBeatActivity; - -#[Workflow\WorkflowInterface] -class FailedHeartbeatWorkflow -{ - #[WorkflowMethod(name: 'FailedHeartbeatWorkflow')] - public function handler( - int $iterations - ): iterable { - $act = Workflow::newActivityStub( - HeartBeatActivity::class, - ActivityOptions::new() - ->withStartToCloseTimeout(50) - // will fail on first attempt - ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(2)) - ); - - return yield $act->failedActivity($iterations); - } -} diff --git a/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php b/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php deleted file mode 100644 index c389fd78..00000000 --- a/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php +++ /dev/null @@ -1,55 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\SignalMethod; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class LoopWithSignalCoroutinesWorkflow -{ - private array $values = []; - private array $result = []; - private $simple; - - public function __construct() - { - $this->simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - } - - #[SignalMethod] - public function addValue( - string $value - ) { - $value = yield $this->simple->prefix('in signal ', $value); - $value = yield $this->simple->prefix('in signal 2 ', $value); - - $this->values[] = $value; - } - - #[WorkflowMethod(name: 'LoopWithSignalCoroutinesWorkflow')] - public function run( - int $count - ) { - while (true) { - yield Workflow::await(fn() => $this->values !== []); - $value = array_shift($this->values); - - // uppercases - $this->result[] = yield $this->simple->echo($value); - - if (count($this->result) === $count) { - break; - } - } - - asort($this->result); - return array_values($this->result); - } -} diff --git a/tests/temporal/Workflow/LoopWorkflow.php b/tests/temporal/Workflow/LoopWorkflow.php deleted file mode 100644 index 97d7a3aa..00000000 --- a/tests/temporal/Workflow/LoopWorkflow.php +++ /dev/null @@ -1,51 +0,0 @@ -<?php - - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\SignalMethod; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class LoopWorkflow -{ - private array $values = []; - private array $result = []; - private $simple; - - public function __construct() - { - $this->simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - } - - #[SignalMethod] - public function addValue( - string $value - ) { - $this->values[] = $value; - } - - #[WorkflowMethod(name: 'LoopWorkflow')] - public function run( - int $count - ) { - while (true) { - yield Workflow::await(fn() => $this->values !== []); - $value = array_shift($this->values); - - $this->result[] = yield $this->simple->echo($value); - - if (count($this->result) === $count) { - break; - } - } - - return $this->result; - } -} diff --git a/tests/temporal/Workflow/ParallelScopesWorkflow.php b/tests/temporal/Workflow/ParallelScopesWorkflow.php deleted file mode 100644 index 8a2303f4..00000000 --- a/tests/temporal/Workflow/ParallelScopesWorkflow.php +++ /dev/null @@ -1,36 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Promise; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class ParallelScopesWorkflow -{ - #[WorkflowMethod(name: 'ParallelScopesWorkflow')] - public function handler(string $input) - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - $a = Workflow::newCancellationScope(function () use ($simple, $input) { - return yield $simple->echo($input); - }); - - $b = Workflow::newCancellationScope(function () use ($simple, $input) { - return yield $simple->lower($input); - }); - - [$ra, $rb] = yield Promise::all([$a, $b]); - - return sprintf('%s|%s|%s', $ra, $input, $rb); - } -} diff --git a/tests/temporal/Workflow/PeriodicWorkflow.php b/tests/temporal/Workflow/PeriodicWorkflow.php deleted file mode 100644 index 08f5f2fa..00000000 --- a/tests/temporal/Workflow/PeriodicWorkflow.php +++ /dev/null @@ -1,19 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class PeriodicWorkflow -{ - #[WorkflowMethod(name: 'PeriodicWorkflow')] - public function handler() - { - error_log("GOT SOMETHING" . print_r(Workflow::getLastCompletionResult(), true)); - - // todo: get last completion result - return 'OK'; - } -} diff --git a/tests/temporal/Workflow/ProtoPayloadWorkflow.php b/tests/temporal/Workflow/ProtoPayloadWorkflow.php deleted file mode 100644 index 7adbed1e..00000000 --- a/tests/temporal/Workflow/ProtoPayloadWorkflow.php +++ /dev/null @@ -1,33 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Api\Common\V1\WorkflowExecution; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class ProtoPayloadWorkflow -{ - #[WorkflowMethod(name: 'ProtoPayloadWorkflow')] - public function handler(): iterable - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - $e = new WorkflowExecution(); - $e->setWorkflowId('workflow id'); - $e->setRunId('run id'); - - /** @var WorkflowExecution $e2 */ - $e2 = yield $simple->updateRunID($e); - assert($e2->getWorkflowId() === $e->getWorkflowId()); - assert($e2->getRunId() === 'updated'); - - return $e2; - } -} diff --git a/tests/temporal/Workflow/QueryWorkflow.php b/tests/temporal/Workflow/QueryWorkflow.php deleted file mode 100644 index 96e41582..00000000 --- a/tests/temporal/Workflow/QueryWorkflow.php +++ /dev/null @@ -1,41 +0,0 @@ -<?php - -/** - * This file is part of Temporal package. - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class QueryWorkflow -{ - private int $counter = 0; - - #[Workflow\SignalMethod(name: "add")] - public function add( - int $value - ) { - $this->counter += $value; - } - - #[Workflow\QueryMethod(name: "get")] - public function get(): int - { - return $this->counter; - } - - #[WorkflowMethod] - public function handler() - { - // collect signals during one second - yield Workflow::timer(1); - - return $this->counter; - } -} diff --git a/tests/temporal/Workflow/RuntimeSignalWorkflow.php b/tests/temporal/Workflow/RuntimeSignalWorkflow.php deleted file mode 100644 index f700af72..00000000 --- a/tests/temporal/Workflow/RuntimeSignalWorkflow.php +++ /dev/null @@ -1,31 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use React\Promise\Deferred; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class RuntimeSignalWorkflow -{ - #[WorkflowMethod] - public function handler() - { - $wait1 = new Deferred(); - $wait2 = new Deferred(); - - $counter = 0; - - Workflow::registerSignal('add', function ($value) use (&$counter, $wait1, $wait2) { - $counter += $value; - $wait1->resolve($value); - $wait2->resolve($value); - }); - - yield $wait1; - yield $wait2; - - return $counter; - } -} diff --git a/tests/temporal/Workflow/SagaWorkflow.php b/tests/temporal/Workflow/SagaWorkflow.php deleted file mode 100644 index e47c0203..00000000 --- a/tests/temporal/Workflow/SagaWorkflow.php +++ /dev/null @@ -1,54 +0,0 @@ -<?php - -/** - * This file is part of Temporal package. - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Common\RetryOptions; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Workflow; - -#[Workflow\WorkflowInterface] -class SagaWorkflow -{ - #[Workflow\WorkflowMethod(name: 'SagaWorkflow')] - public function run() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new() - ->withStartToCloseTimeout(60) - ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(1)) - ); - - $saga = new Workflow\Saga(); - $saga->setParallelCompensation(true); - - try { - yield $simple->echo('test'); - $saga->addCompensation( - function () use ($simple) { - yield $simple->echo('compensate echo'); - } - ); - - yield $simple->lower('TEST'); - $saga->addCompensation( - function () use ($simple) { - yield $simple->lower('COMPENSATE LOWER'); - } - ); - - yield $simple->fail(); - } catch (\Throwable $e) { - yield $saga->compensate(); - throw $e; - } - } -} diff --git a/tests/temporal/Workflow/SideEffectWorkflow.php b/tests/temporal/Workflow/SideEffectWorkflow.php deleted file mode 100644 index 95d396e4..00000000 --- a/tests/temporal/Workflow/SideEffectWorkflow.php +++ /dev/null @@ -1,30 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Common\Uuid; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class SideEffectWorkflow -{ - #[WorkflowMethod(name: 'SideEffectWorkflow')] - public function handler(string $input): iterable - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - $result = yield Workflow::sideEffect( - function () use ($input) { - return $input . '-42'; - } - ); - - return yield $simple->lower($result); - } -} diff --git a/tests/temporal/Workflow/SignalChildViaStubWorkflow.php b/tests/temporal/Workflow/SignalChildViaStubWorkflow.php deleted file mode 100644 index 828086fc..00000000 --- a/tests/temporal/Workflow/SignalChildViaStubWorkflow.php +++ /dev/null @@ -1,25 +0,0 @@ -<?php - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class SignalChildViaStubWorkflow -{ - #[WorkflowMethod(name: 'SignalChildViaStubWorkflow')] - public function handler() - { - // typed stub - $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class); - - // start execution - $call = $simple->handler(); - - yield $simple->add(8); - - // expects 8 - return yield $call; - } -} diff --git a/tests/temporal/Workflow/SimpleDTOWorkflow.php b/tests/temporal/Workflow/SimpleDTOWorkflow.php deleted file mode 100644 index bd39a0a0..00000000 --- a/tests/temporal/Workflow/SimpleDTOWorkflow.php +++ /dev/null @@ -1,35 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; -use Temporal\Tests\DTO\Message; -use Temporal\Tests\DTO\User; - -#[Workflow\WorkflowInterface] -class SimpleDTOWorkflow -{ - #[WorkflowMethod(name: 'SimpleDTOWorkflow')]//, returnType: Message::class)] - public function handler( - User $user - ) { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new() - ->withStartToCloseTimeout(5) - ); - - $value = yield $simple->greet($user); - - if (!$value instanceof Message) { - return "FAIL"; - } - - return $value; - } -} diff --git a/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php b/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php deleted file mode 100644 index c9999cd1..00000000 --- a/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php +++ /dev/null @@ -1,25 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\HeartBeatActivity; - -#[Workflow\WorkflowInterface] -class SimpleHeartbeatWorkflow -{ - #[WorkflowMethod(name: 'SimpleHeartbeatWorkflow')] - public function handler(int $iterations): iterable - { - $act = Workflow::newActivityStub( - HeartBeatActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(50) - ); - - return yield $act->doSomething($iterations); - } -} diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflow.php b/tests/temporal/Workflow/SimpleSignalledWorkflow.php deleted file mode 100644 index 0df25a65..00000000 --- a/tests/temporal/Workflow/SimpleSignalledWorkflow.php +++ /dev/null @@ -1,30 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class SimpleSignalledWorkflow -{ - private $counter = 0; - - #[Workflow\SignalMethod(name: "add")] - public function add( - int $value - ) { - $this->counter += $value; - } - - #[WorkflowMethod(name: 'SimpleSignalledWorkflow')] - public function handler(): iterable - { - // collect signals during one second - yield Workflow::timer(1); - - return $this->counter; - } -} diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php b/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php deleted file mode 100644 index d10ba04a..00000000 --- a/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php +++ /dev/null @@ -1,34 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class SimpleSignalledWorkflowWithSleep -{ - private $counter = 0; - - #[Workflow\SignalMethod(name: "add")] - public function add( - int $value - ) { - $this->counter += $value; - } - - #[WorkflowMethod(name: 'SimpleSignalledWorkflowWithSleep')] - public function handler(): iterable - { - // collect signals during one second - yield Workflow::timer(1); - - if (!Workflow::isReplaying()) { - sleep(1); - } - - return $this->counter; - } -} diff --git a/tests/temporal/Workflow/SimpleWorkflow.php b/tests/temporal/Workflow/SimpleWorkflow.php deleted file mode 100644 index 36e12f69..00000000 --- a/tests/temporal/Workflow/SimpleWorkflow.php +++ /dev/null @@ -1,31 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Common\RetryOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class SimpleWorkflow -{ - #[WorkflowMethod(name: 'SimpleWorkflow')] - public function handler( - string $input - ): iterable { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new() - ->withStartToCloseTimeout(5) - ->withRetryOptions( - RetryOptions::new()->withMaximumAttempts(2) - ) - ); - - return yield $simple->echo($input); - } -} diff --git a/tests/temporal/Workflow/TimerWorkflow.php b/tests/temporal/Workflow/TimerWorkflow.php deleted file mode 100644 index ab60d6c9..00000000 --- a/tests/temporal/Workflow/TimerWorkflow.php +++ /dev/null @@ -1,27 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class TimerWorkflow -{ - #[WorkflowMethod(name: 'TimerWorkflow')] - public function handler(string $input): iterable - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - yield Workflow::timer(1); - - return yield $simple->lower($input); - } -} diff --git a/tests/temporal/Workflow/WaitWorkflow.php b/tests/temporal/Workflow/WaitWorkflow.php deleted file mode 100644 index 826952c1..00000000 --- a/tests/temporal/Workflow/WaitWorkflow.php +++ /dev/null @@ -1,33 +0,0 @@ -<?php - - -namespace Temporal\Tests\Workflow; - - -use Temporal\Workflow; -use Temporal\Workflow\SignalMethod; -use Temporal\Workflow\WorkflowInterface; -use Temporal\Workflow\WorkflowMethod; - -#[WorkflowInterface] -class WaitWorkflow -{ - private bool $ready = false; - private string $value; - - #[SignalMethod] - public function unlock( - string $value - ) { - $this->ready = true; - $this->value = $value; - } - - #[WorkflowMethod(name: 'WaitWorkflow')] - public function run() - { - yield Workflow::await(fn() => $this->ready); - - return $this->value; - } -} diff --git a/tests/temporal/Workflow/WithChildStubWorkflow.php b/tests/temporal/Workflow/WithChildStubWorkflow.php deleted file mode 100644 index cdebe3d8..00000000 --- a/tests/temporal/Workflow/WithChildStubWorkflow.php +++ /dev/null @@ -1,20 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class WithChildStubWorkflow -{ - #[WorkflowMethod(name: 'WithChildStubWorkflow')] - public function handler(string $input): iterable - { - $child = Workflow::newChildWorkflowStub(SimpleWorkflow::class); - - return 'Child: ' . (yield $child->handler('child ' . $input)); - } -} diff --git a/tests/temporal/Workflow/WithChildWorkflow.php b/tests/temporal/Workflow/WithChildWorkflow.php deleted file mode 100644 index aac0979b..00000000 --- a/tests/temporal/Workflow/WithChildWorkflow.php +++ /dev/null @@ -1,25 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Temporal\Tests\Workflow; - -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; - -#[Workflow\WorkflowInterface] -class WithChildWorkflow -{ - #[WorkflowMethod(name: 'WithChildWorkflow')] - public function handler( - string $input - ): iterable { - $result = yield Workflow::executeChildWorkflow( - 'SimpleWorkflow', - ['child ' . $input], - Workflow\ChildWorkflowOptions::new() - ); - - return 'Child: ' . $result; - } -} diff --git a/tests/temporal/Workflow/WorkflowWithSequence.php b/tests/temporal/Workflow/WorkflowWithSequence.php deleted file mode 100644 index 9e813a9c..00000000 --- a/tests/temporal/Workflow/WorkflowWithSequence.php +++ /dev/null @@ -1,30 +0,0 @@ -<?php - - -namespace Temporal\Tests\Workflow; - -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class WorkflowWithSequence -{ - #[WorkflowMethod(name: 'WorkflowWithSequence')] - public function handler() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - $a = $simple->echo('a'); - $b = $simple->echo('b'); - - yield $a; - yield $b; - - return 'OK'; - } -} diff --git a/tests/temporal/Workflow/WorkflowWithSignalledSteps.php b/tests/temporal/Workflow/WorkflowWithSignalledSteps.php deleted file mode 100644 index 5f1af766..00000000 --- a/tests/temporal/Workflow/WorkflowWithSignalledSteps.php +++ /dev/null @@ -1,51 +0,0 @@ -<?php - - -namespace Temporal\Tests\Workflow; - -use React\Promise\Deferred; -use React\Promise\PromiseInterface; -use Temporal\Activity\ActivityOptions; -use Temporal\Workflow; -use Temporal\Workflow\WorkflowMethod; -use Temporal\Tests\Activity\SimpleActivity; - -#[Workflow\WorkflowInterface] -class WorkflowWithSignalledSteps -{ - #[WorkflowMethod(name: 'WorkflowWithSignalledSteps')] - public function handler() - { - $simple = Workflow::newActivityStub( - SimpleActivity::class, - ActivityOptions::new()->withStartToCloseTimeout(5) - ); - - $value = 0; - Workflow::registerQuery('value', function () use (&$value) { - return $value; - }); - - yield $this->promiseSignal('begin'); - $value++; - - yield $this->promiseSignal('next1'); - $value++; - - yield $this->promiseSignal('next2'); - $value++; - - return $value; - } - - // is this correct? - private function promiseSignal(string $name): PromiseInterface - { - $signal = new Deferred(); - Workflow::registerSignal($name, function ($value) use ($signal) { - $signal->resolve($value); - }); - - return $signal->promise(); - } -} |