summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-27 14:36:00 +0300
committerGitHub <[email protected]>2021-01-27 14:36:00 +0300
commitb65bd870e2dfa1cb06d8cf8f142c34a418e9ae56 (patch)
tree5cb263c5f231ab1683010adda5a2f116da1d2815
parent7ac2fe05d1d460e9a7f92e8838ac1670743bc2d8 (diff)
parent6dd131497808f414ac1cb952d4b0b89b9e0689f8 (diff)
Merge branch 'master' into dependabot/go_modules/github.com/go-redis/redis/v8-8.4.11
-rw-r--r--.github/workflows/linux.yml3
-rw-r--r--.github/workflows/macos.yml2
-rw-r--r--.github/workflows/release.yml139
-rwxr-xr-xMakefile6
-rw-r--r--cmd/cli/reset.go107
-rw-r--r--cmd/cli/root.go129
-rw-r--r--cmd/cli/serve.go63
-rw-r--r--cmd/cli/version.go9
-rw-r--r--cmd/cli/workers.go109
-rw-r--r--cmd/main.go69
-rw-r--r--go.mod10
-rw-r--r--go.sum1
-rw-r--r--plugins/temporal/activity/activity_pool.go197
-rw-r--r--plugins/temporal/activity/plugin.go215
-rw-r--r--plugins/temporal/activity/rpc.go66
-rw-r--r--plugins/temporal/client/doc/doc.go1
-rw-r--r--plugins/temporal/client/doc/temporal.drawio1
-rw-r--r--plugins/temporal/client/plugin.go169
-rw-r--r--plugins/temporal/protocol/converter.go76
-rw-r--r--plugins/temporal/protocol/converter_test.go23
-rw-r--r--plugins/temporal/protocol/internal/protocol.pb.go167
-rw-r--r--plugins/temporal/protocol/json_codec.go225
-rw-r--r--plugins/temporal/protocol/message.go334
-rw-r--r--plugins/temporal/protocol/proto_codec.go145
-rw-r--r--plugins/temporal/protocol/protocol.go77
-rw-r--r--plugins/temporal/protocol/worker_info.go72
-rw-r--r--plugins/temporal/workflow/canceller.go41
-rw-r--r--plugins/temporal/workflow/canceller_test.go33
-rw-r--r--plugins/temporal/workflow/id_registry.go51
-rw-r--r--plugins/temporal/workflow/message_queue.go47
-rw-r--r--plugins/temporal/workflow/message_queue_test.go53
-rw-r--r--plugins/temporal/workflow/plugin.go203
-rw-r--r--plugins/temporal/workflow/process.go436
-rw-r--r--plugins/temporal/workflow/workflow_pool.go190
-rw-r--r--tests/composer.json5
-rw-r--r--tests/plugins/temporal/.rr.yaml22
-rw-r--r--tests/plugins/temporal/cancel_test.go291
-rw-r--r--tests/plugins/temporal/child_test.go84
-rw-r--r--tests/plugins/temporal/disaster_test.go114
-rw-r--r--tests/plugins/temporal/hp_test.go408
-rw-r--r--tests/plugins/temporal/query_test.go66
-rw-r--r--tests/plugins/temporal/server_test.go198
-rw-r--r--tests/plugins/temporal/signal_test.go170
-rw-r--r--tests/plugins/temporal/worker.php33
-rw-r--r--tests/temporal/Activity/HeartBeatActivity.php58
-rw-r--r--tests/temporal/Activity/SimpleActivity.php63
-rw-r--r--tests/temporal/Client/StartNewWorkflow.php23
-rw-r--r--tests/temporal/DTO/Message.php14
-rw-r--r--tests/temporal/DTO/User.php15
-rw-r--r--tests/temporal/Workflow/ActivityStubWorkflow.php39
-rw-r--r--tests/temporal/Workflow/AggregatedWorkflow.php30
-rw-r--r--tests/temporal/Workflow/AsyncActivityWorkflow.php28
-rw-r--r--tests/temporal/Workflow/BinaryWorkflow.php21
-rw-r--r--tests/temporal/Workflow/CancelSignalledChildWorkflow.php57
-rw-r--r--tests/temporal/Workflow/CanceledHeartbeatWorkflow.php29
-rw-r--r--tests/temporal/Workflow/CancelledMidflightWorkflow.php47
-rw-r--r--tests/temporal/Workflow/CancelledNestedWorkflow.php72
-rw-r--r--tests/temporal/Workflow/CancelledScopeWorkflow.php39
-rw-r--r--tests/temporal/Workflow/CancelledSingleScopeWorkflow.php55
-rw-r--r--tests/temporal/Workflow/CancelledWithCompensationWorkflow.php79
-rw-r--r--tests/temporal/Workflow/CancelledWorkflow.php31
-rw-r--r--tests/temporal/Workflow/ChainedWorkflow.php31
-rw-r--r--tests/temporal/Workflow/ChildStubWorkflow.php30
-rw-r--r--tests/temporal/Workflow/ComplexExceptionalWorkflow.php26
-rw-r--r--tests/temporal/Workflow/ContinuableWorkflow.php38
-rw-r--r--tests/temporal/Workflow/EmptyWorkflow.php16
-rw-r--r--tests/temporal/Workflow/ExceptionalActivityWorkflow.php25
-rw-r--r--tests/temporal/Workflow/ExceptionalWorkflow.php18
-rw-r--r--tests/temporal/Workflow/FailedHeartbeatWorkflow.php30
-rw-r--r--tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php55
-rw-r--r--tests/temporal/Workflow/LoopWorkflow.php51
-rw-r--r--tests/temporal/Workflow/ParallelScopesWorkflow.php36
-rw-r--r--tests/temporal/Workflow/PeriodicWorkflow.php19
-rw-r--r--tests/temporal/Workflow/ProtoPayloadWorkflow.php33
-rw-r--r--tests/temporal/Workflow/QueryWorkflow.php41
-rw-r--r--tests/temporal/Workflow/RuntimeSignalWorkflow.php31
-rw-r--r--tests/temporal/Workflow/SagaWorkflow.php54
-rw-r--r--tests/temporal/Workflow/SideEffectWorkflow.php30
-rw-r--r--tests/temporal/Workflow/SignalChildViaStubWorkflow.php25
-rw-r--r--tests/temporal/Workflow/SimpleDTOWorkflow.php35
-rw-r--r--tests/temporal/Workflow/SimpleHeartbeatWorkflow.php25
-rw-r--r--tests/temporal/Workflow/SimpleSignalledWorkflow.php30
-rw-r--r--tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php34
-rw-r--r--tests/temporal/Workflow/SimpleWorkflow.php31
-rw-r--r--tests/temporal/Workflow/TimerWorkflow.php27
-rw-r--r--tests/temporal/Workflow/WaitWorkflow.php33
-rw-r--r--tests/temporal/Workflow/WithChildStubWorkflow.php20
-rw-r--r--tests/temporal/Workflow/WithChildWorkflow.php25
-rw-r--r--tests/temporal/Workflow/WorkflowWithSequence.php30
-rw-r--r--tests/temporal/Workflow/WorkflowWithSignalledSteps.php51
90 files changed, 1 insertions, 6489 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index 076de74c..27df293a 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -69,9 +69,6 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal.out -covermode=atomic ./tests/plugins/temporal
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_protocol.out -covermode=atomic ./plugins/temporal/protocol
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_workflow.out -covermode=atomic ./plugins/temporal/workflow
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml
index 00b51598..77f9cfda 100644
--- a/.github/workflows/macos.yml
+++ b/.github/workflows/macos.yml
@@ -67,8 +67,6 @@ jobs:
go test -v -race -tags=debug ./pkg/pool
go test -v -race -tags=debug ./pkg/worker
go test -v -race -tags=debug ./pkg/worker_watcher
- go test -v -race -tags=debug ./plugins/temporal/protocol
- go test -v -race -tags=debug ./plugins/temporal/workflow
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./tests/plugins/http
go test -v -race -tags=debug ./tests/plugins/informer
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
deleted file mode 100644
index 785f40ad..00000000
--- a/.github/workflows/release.yml
+++ /dev/null
@@ -1,139 +0,0 @@
-name: release
-
-on:
- release: # Docs: <https://help.github.com/en/articles/events-that-trigger-workflows#release-event-release>
- types: [published]
-
-jobs:
- build:
- name: Build for ${{ matrix.os }} (${{ matrix.arch }}, ${{ matrix.compiler }})
- runs-on: ubuntu-20.04
- strategy:
- fail-fast: false
- matrix:
- os: [windows, freebsd, darwin] # linux, freebsd, darwin, windows
- compiler: [gcc] # gcc, musl-gcc
- archiver: [zip] # tar, zip
- arch: [amd64] # amd64, 386
- include:
- - os: linux
- compiler: gcc
- archiver: tar
- arch: amd64
- - os: ''
- compiler: musl-gcc # more info: <https://musl.libc.org/>
- archiver: zip
- arch: amd64
- steps:
- - name: Set up Go
- uses: actions/setup-go@v2
- with:
- go-version: 1.15.6
-
- - name: Check out code
- uses: actions/checkout@v2
-
- - name: Install musl
- if: matrix.compiler == 'musl-gcc'
- run: sudo apt-get install -y musl-tools
-
- - name: Download dependencies
- run: go mod download # `-x` means "verbose" mode
-
- - name: Generate builder values
- id: values
- run: |
- echo "::set-output name=version::`echo ${GITHUB_REF##*/} | sed -e 's/^[vV ]*//'`"
- echo "::set-output name=timestamp::`date +%FT%T%z`"
- echo "::set-output name=binary-name::rr`[ ${{ matrix.os }} = 'windows' ] && echo '.exe'`"
-
- - name: Compile binary file
- env:
- GOOS: ${{ matrix.os }}
- GOARCH: ${{ matrix.arch }}
- CC: ${{ matrix.compiler }}
- CGO_ENABLED: 0
- LDFLAGS: >-
- -s
- -X github.com/spiral/roadrunner/cmd/cli.Version=${{ steps.values.outputs.version }}
- -X github.com/spiral/roadrunner/cmd/cli.BuildTime=${{ steps.values.outputs.timestamp }}
- run: |
- go build -trimpath -ldflags "$LDFLAGS" -o "./${{ steps.values.outputs.binary-name }}" ./cmd/main.go
- stat "./${{ steps.values.outputs.binary-name }}"
-
- - name: Generate distributive directory name
- id: dist-dir
- run: >
- echo "::set-output name=name::roadrunner-${{ steps.values.outputs.version }}-$(
- [ ${{ matrix.os }} != '' ] && echo '${{ matrix.os }}' || echo 'unknown'
- )$(
- [ ${{ matrix.compiler }} = 'musl-gcc' ] && echo '-musl'
- )-${{ matrix.arch }}"
-
- - name: Generate distributive archive name
- id: dist-arch
- run: >
- echo "::set-output name=name::${{ steps.dist-dir.outputs.name }}.$(
- case ${{ matrix.archiver }} in
- zip) echo 'zip';;
- tar) echo 'tar.gz';;
- *) exit 10;
- esac
- )"
-
- - name: Create distributive
- run: |
- mkdir ${{ steps.dist-dir.outputs.name }}
- mv "./${{ steps.values.outputs.binary-name }}" ./${{ steps.dist-dir.outputs.name }}/
- cp ./README.md ./CHANGELOG.md ./LICENSE ./${{ steps.dist-dir.outputs.name }}/
-
- - name: Pack distributive using tar
- if: matrix.archiver == 'tar'
- run: tar -zcf "${{ steps.dist-arch.outputs.name }}" "${{ steps.dist-dir.outputs.name }}"
-
- - name: Pack distributive using zip
- if: matrix.archiver == 'zip'
- run: zip -r -q "${{ steps.dist-arch.outputs.name }}" "${{ steps.dist-dir.outputs.name }}"
-
- - name: Upload artifact
- uses: actions/upload-artifact@v2
- with:
- name: ${{ steps.dist-dir.outputs.name }}
- path: ${{ steps.dist-arch.outputs.name }}
- if-no-files-found: error
- retention-days: 30
-
- - name: Upload binaries to release
- uses: svenstaro/upload-release-action@v2
- with:
- repo_token: ${{ secrets.GITHUB_TOKEN }}
- file: ${{ steps.dist-arch.outputs.name }}
- asset_name: ${{ steps.dist-arch.outputs.name }}
- tag: ${{ github.ref }}
-
- docker:
- name: Build docker image
- runs-on: ubuntu-20.04
- steps:
- - name: Check out code
- uses: actions/checkout@v2
-
- - name: Make docker login
- run: echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_LOGIN }}" --password-stdin
-
- - name: Generate builder values
- id: values
- run: |
- echo "::set-output name=version::`echo ${GITHUB_REF##*/} | sed -e 's/^[vV ]*//'`"
- echo "::set-output name=timestamp::`date +%FT%T%z`"
-
- - name: Build image
- run: |
- docker build \
- --tag "spiralscout/roadrunner:${{ steps.values.outputs.version }}" \
- --build-arg "APP_VERSION=${{ steps.values.outputs.version }}" \
- --build-arg "BUILD_TIME=${{ steps.values.outputs.timestamp }}" \
- .
-
- - name: Push image into registry
- run: docker push "spiralscout/roadrunner:${{ steps.values.outputs.version }}"
diff --git a/Makefile b/Makefile
index bfe915f5..fead8744 100755
--- a/Makefile
+++ b/Makefile
@@ -32,9 +32,6 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pool.out -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal.out -covermode=atomic ./tests/plugins/temporal
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal_protocol.out -covermode=atomic ./plugins/temporal/protocol
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/temporal_workflow.out -covermode=atomic ./plugins/temporal/workflow
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload
@@ -66,9 +63,6 @@ test: ## Run application tests
go test -v -race -tags=debug ./pkg/pool
go test -v -race -tags=debug ./pkg/worker
go test -v -race -tags=debug ./pkg/worker_watcher
- go test -v -race -tags=debug ./tests/plugins/temporal
- go test -v -race -tags=debug ./plugins/temporal/protocol
- go test -v -race -tags=debug ./plugins/temporal/workflow
go test -v -race -tags=debug ./tests/plugins/http
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./tests/plugins/informer
diff --git a/cmd/cli/reset.go b/cmd/cli/reset.go
deleted file mode 100644
index a5055a53..00000000
--- a/cmd/cli/reset.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package cli
-
-import (
- "fmt"
- "sync"
-
- "github.com/fatih/color"
- "github.com/mattn/go-runewidth"
- "github.com/spf13/cobra"
- "github.com/spiral/errors"
- "github.com/vbauerster/mpb/v5"
- "github.com/vbauerster/mpb/v5/decor"
-)
-
-// List is the resetter.List RPC method
-const List string = "resetter.List"
-
-// Reset is the resetter.Reset RPC method
-const Reset string = "resetter.Reset"
-
-func init() {
- root.AddCommand(&cobra.Command{
- Use: "reset",
- Short: "Reset workers of all or specific RoadRunner service",
- RunE: resetHandler,
- })
-}
-
-func resetHandler(_ *cobra.Command, args []string) error {
- const op = errors.Op("reset_handler")
- client, err := RPCClient()
- if err != nil {
- return err
- }
- defer func() {
- _ = client.Close()
- }()
-
- var services []string
- if len(args) != 0 {
- services = args
- } else {
- err = client.Call(List, true, &services)
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- var wg sync.WaitGroup
- pr := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithWidth(6))
- wg.Add(len(services))
-
- for _, service := range services {
- var (
- bar *mpb.Bar
- name = runewidth.FillRight(fmt.Sprintf("Resetting plugin: [%s]", color.HiYellowString(service)), 27)
- result = make(chan interface{})
- )
-
- bar = pr.AddSpinner(
- 1,
- mpb.SpinnerOnMiddle,
- mpb.SpinnerStyle([]string{"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"}),
- mpb.PrependDecorators(decor.Name(name)),
- mpb.AppendDecorators(onComplete(result)),
- )
-
- // simulating some work
- go func(service string, result chan interface{}) {
- defer wg.Done()
- defer bar.Increment()
-
- var done bool
- err = client.Call(Reset, service, &done)
- if err != nil {
- result <- errors.E(op, err)
- return
- }
- result <- nil
- }(service, result)
- }
-
- pr.Wait()
- return nil
-}
-
-func onComplete(result chan interface{}) decor.Decorator {
- var (
- msg = ""
- fn = func(s decor.Statistics) string {
- select {
- case r := <-result:
- if err, ok := r.(error); ok {
- msg = color.HiRedString(err.Error())
- return msg
- }
-
- msg = color.HiGreenString("done")
- return msg
- default:
- return msg
- }
- }
- )
-
- return decor.Any(fn)
-}
diff --git a/cmd/cli/root.go b/cmd/cli/root.go
deleted file mode 100644
index 6f73aecf..00000000
--- a/cmd/cli/root.go
+++ /dev/null
@@ -1,129 +0,0 @@
-package cli
-
-import (
- "log"
- "net/http/pprof"
- "net/rpc"
- "os"
- "path/filepath"
-
- "github.com/spiral/errors"
- goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
-
- "github.com/spiral/roadrunner/v2/plugins/config"
-
- "net/http"
-
- "github.com/spf13/cobra"
- endure "github.com/spiral/endure/pkg/container"
-)
-
-var (
- // WorkDir is working directory
- WorkDir string
- // CfgFile is path to the .rr.yaml
- CfgFile string
- // Debug mode
- Debug bool
- // Container is the pointer to the Endure container
- Container *endure.Endure
- cfg *config.Viper
- root = &cobra.Command{
- Use: "rr",
- SilenceErrors: true,
- SilenceUsage: true,
- Version: Version,
- }
-)
-
-func Execute() {
- if err := root.Execute(); err != nil {
- // exit with error, fatal invoke os.Exit(1)
- log.Fatal(err)
- }
-}
-
-func init() {
- root.PersistentFlags().StringVarP(&CfgFile, "config", "c", ".rr.yaml", "config file (default is .rr.yaml)")
- root.PersistentFlags().StringVarP(&WorkDir, "WorkDir", "w", "", "work directory")
- root.PersistentFlags().BoolVarP(&Debug, "debug", "d", false, "debug mode")
- cobra.OnInitialize(func() {
- if CfgFile != "" {
- if absPath, err := filepath.Abs(CfgFile); err == nil {
- CfgFile = absPath
-
- // force working absPath related to config file
- if err := os.Chdir(filepath.Dir(absPath)); err != nil {
- panic(err)
- }
- }
- }
-
- if WorkDir != "" {
- if err := os.Chdir(WorkDir); err != nil {
- panic(err)
- }
- }
-
- cfg = &config.Viper{}
- cfg.Path = CfgFile
- cfg.Prefix = "rr"
-
- // register config
- err := Container.Register(cfg)
- if err != nil {
- panic(err)
- }
-
- // if debug mode is on - run debug server
- if Debug {
- runDebugServer()
- }
- })
-}
-
-// RPCClient is using to make a requests to the ./rr reset, ./rr workers
-func RPCClient() (*rpc.Client, error) {
- rpcConfig := &rpcPlugin.Config{}
-
- err := cfg.Init()
- if err != nil {
- return nil, err
- }
-
- if !cfg.Has(rpcPlugin.PluginName) {
- return nil, errors.E("rpc service disabled")
- }
-
- err = cfg.UnmarshalKey(rpcPlugin.PluginName, rpcConfig)
- if err != nil {
- return nil, err
- }
- rpcConfig.InitDefaults()
-
- conn, err := rpcConfig.Dialer()
- if err != nil {
- return nil, err
- }
-
- return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
-}
-
-// debug server
-func runDebugServer() {
- mux := http.NewServeMux()
- mux.HandleFunc("/debug/pprof/", pprof.Index)
- mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
- mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
- mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
- mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
- srv := http.Server{
- Addr: ":6061",
- Handler: mux,
- }
-
- if err := srv.ListenAndServe(); err != nil {
- log.Fatal(err)
- }
-}
diff --git a/cmd/cli/serve.go b/cmd/cli/serve.go
deleted file mode 100644
index 993ec477..00000000
--- a/cmd/cli/serve.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package cli
-
-import (
- "log"
- "os"
- "os/signal"
- "syscall"
-
- "github.com/spf13/cobra"
- "github.com/spiral/errors"
- "go.uber.org/multierr"
-)
-
-func init() {
- root.AddCommand(&cobra.Command{
- Use: "serve",
- Short: "Start RoadRunner server",
- RunE: handler,
- })
-}
-
-func handler(_ *cobra.Command, _ []string) error {
- const op = errors.Op("handle_serve_command")
- /*
- We need to have path to the config at the RegisterTarget stage
- But after cobra.Execute, because cobra fills up cli variables on this stage
- */
-
- err := Container.Init()
- if err != nil {
- return errors.E(op, err)
- }
-
- errCh, err := Container.Serve()
- if err != nil {
- return errors.E(op, err)
- }
-
- // https://golang.org/pkg/os/signal/#Notify
- // should be of buffer size at least 1
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
-
- for {
- select {
- case e := <-errCh:
- err = multierr.Append(err, e.Error)
- log.Printf("error occurred: %v, service: %s", e.Error.Error(), e.VertexID)
- er := Container.Stop()
- if er != nil {
- err = multierr.Append(err, er)
- return errors.E(op, err)
- }
- return errors.E(op, err)
- case <-c:
- err = Container.Stop()
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- }
- }
-}
diff --git a/cmd/cli/version.go b/cmd/cli/version.go
deleted file mode 100644
index 89728bd2..00000000
--- a/cmd/cli/version.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package cli
-
-var (
- // Version - defines build version.
- Version string = "local"
-
- // BuildTime - defined build time.
- BuildTime string = "development"
-)
diff --git a/cmd/cli/workers.go b/cmd/cli/workers.go
deleted file mode 100644
index 09642a58..00000000
--- a/cmd/cli/workers.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package cli
-
-import (
- "fmt"
- "log"
- "net/rpc"
- "os"
- "os/signal"
- "syscall"
- "time"
-
- tm "github.com/buger/goterm"
- "github.com/fatih/color"
- "github.com/spf13/cobra"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/informer"
- "github.com/spiral/roadrunner/v2/tools"
-)
-
-// use interactive mode
-var interactive bool
-
-const InformerList string = "informer.List"
-const InformerWorkers string = "informer.Workers"
-
-func init() {
- workersCommand := &cobra.Command{
- Use: "workers",
- Short: "Show information about active roadrunner workers",
- RunE: workersHandler,
- }
-
- workersCommand.Flags().BoolVarP(
- &interactive,
- "interactive",
- "i",
- false,
- "render interactive workers table",
- )
-
- root.AddCommand(workersCommand)
-}
-
-func workersHandler(_ *cobra.Command, args []string) error {
- const op = errors.Op("handle_workers_command")
- // get RPC client
- client, err := RPCClient()
- if err != nil {
- return err
- }
- defer func() {
- err := client.Close()
- if err != nil {
- log.Printf("error when closing RPCClient: error %v", err)
- }
- }()
-
- var plugins []string
- // assume user wants to show workers from particular plugin
- if len(args) != 0 {
- plugins = args
- } else {
- err = client.Call(InformerList, true, &plugins)
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- if !interactive {
- return showWorkers(plugins, client)
- }
-
- // https://golang.org/pkg/os/signal/#Notify
- // should be of buffer size at least 1
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
-
- tm.Clear()
- tt := time.NewTicker(time.Second)
- defer tt.Stop()
- for {
- select {
- case <-c:
- return nil
- case <-tt.C:
- tm.MoveCursor(1, 1)
- err := showWorkers(plugins, client)
- if err != nil {
- return errors.E(op, err)
- }
- tm.Flush()
- }
- }
-}
-
-func showWorkers(plugins []string, client *rpc.Client) error {
- const op = errors.Op("show_workers")
- for _, plugin := range plugins {
- list := &informer.WorkerList{}
- err := client.Call(InformerWorkers, plugin, &list)
- if err != nil {
- return errors.E(op, err)
- }
-
- fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin))
- tools.WorkerTable(os.Stdout, list.Workers).Render()
- }
- return nil
-}
diff --git a/cmd/main.go b/cmd/main.go
deleted file mode 100644
index 8074f316..00000000
--- a/cmd/main.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package main
-
-import (
- "log"
-
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/roadrunner/v2/cmd/cli"
- httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
- "github.com/spiral/roadrunner/v2/plugins/informer"
- "github.com/spiral/roadrunner/v2/plugins/temporal/activity"
- temporalClient "github.com/spiral/roadrunner/v2/plugins/temporal/client"
- "github.com/spiral/roadrunner/v2/plugins/temporal/workflow"
-
- "github.com/spiral/roadrunner/v2/plugins/kv/boltdb"
- "github.com/spiral/roadrunner/v2/plugins/kv/memcached"
- "github.com/spiral/roadrunner/v2/plugins/kv/memory"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/metrics"
- "github.com/spiral/roadrunner/v2/plugins/redis"
- "github.com/spiral/roadrunner/v2/plugins/reload"
- "github.com/spiral/roadrunner/v2/plugins/resetter"
- "github.com/spiral/roadrunner/v2/plugins/rpc"
- "github.com/spiral/roadrunner/v2/plugins/server"
-)
-
-func main() {
- var err error
- cli.Container, err = endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.RetryOnFail(false))
- if err != nil {
- log.Fatal(err)
- }
-
- err = cli.Container.RegisterAll(
- // logger plugin
- &logger.ZapLogger{},
- // metrics plugin
- &metrics.Plugin{},
- // redis plugin (internal)
- &redis.Plugin{},
- // http server plugin
- &httpPlugin.Plugin{},
- // reload plugin
- &reload.Plugin{},
- // informer plugin (./rr workers, ./rr workers -i)
- &informer.Plugin{},
- // resetter plugin (./rr reset)
- &resetter.Plugin{},
- // rpc plugin (workers, reset)
- &rpc.Plugin{},
- // server plugin (NewWorker, NewWorkerPool)
- &server.Plugin{},
- // memcached kv plugin
- &memcached.Plugin{},
- // in-memory kv plugin
- &memory.Plugin{},
- // boltdb driver
- &boltdb.Plugin{},
-
- // temporal plugins
- &temporalClient.Plugin{},
- &activity.Plugin{},
- &workflow.Plugin{},
- )
- if err != nil {
- log.Fatal(err)
- }
-
- cli.Execute()
-}
diff --git a/go.mod b/go.mod
index 299e9462..a9835b1b 100644
--- a/go.mod
+++ b/go.mod
@@ -7,38 +7,28 @@ require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/alicebob/miniredis/v2 v2.14.1
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
- github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129
- github.com/cenkalti/backoff/v4 v4.1.0
github.com/dustin/go-humanize v1.0.0
github.com/fatih/color v1.10.0
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-redis/redis/v8 v8.4.11
github.com/gofiber/fiber/v2 v2.3.3
github.com/golang/mock v1.4.4
- github.com/golang/protobuf v1.4.3
github.com/hashicorp/go-multierror v1.1.0
github.com/json-iterator/go v1.1.10
- github.com/mattn/go-runewidth v0.0.10
github.com/olekukonko/tablewriter v0.0.4
- github.com/pborman/uuid v1.2.1
github.com/prometheus/client_golang v1.9.0
github.com/shirou/gopsutil v3.20.12+incompatible
- github.com/spf13/cobra v1.1.1
github.com/spf13/viper v1.7.1
github.com/spiral/endure v1.0.0-beta21
github.com/spiral/errors v1.0.9
github.com/spiral/goridge/v3 v3.0.0
github.com/stretchr/testify v1.7.0
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
- github.com/vbauerster/mpb/v5 v5.4.0
github.com/yookoala/gofast v0.4.0
go.etcd.io/bbolt v1.3.5
- go.temporal.io/api v1.4.0
- go.temporal.io/sdk v1.4.0
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20210105210732-16f7687f5001
- google.golang.org/protobuf v1.25.0
)
diff --git a/go.sum b/go.sum
index c7d5b8c4..d00c14a6 100644
--- a/go.sum
+++ b/go.sum
@@ -287,6 +287,7 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
+github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go
deleted file mode 100644
index d09722ce..00000000
--- a/plugins/temporal/activity/activity_pool.go
+++ /dev/null
@@ -1,197 +0,0 @@
-package activity
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/pool"
- rrWorker "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/client"
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/activity"
- "go.temporal.io/sdk/converter"
- "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/worker"
-)
-
-// RR_MODE env variable
-const RR_MODE = "RR_MODE" //nolint:golint,stylecheck
-// RR_CODEC env variable
-const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck
-
-//
-const doNotCompleteOnReturn = "doNotCompleteOnReturn"
-
-type activityPool interface {
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.SyncWorker
- ActivityNames() []string
- GetActivityContext(taskToken []byte) (context.Context, error)
-}
-
-type activityPoolImpl struct {
- dc converter.DataConverter
- codec rrt.Codec
- seqID uint64
- activities []string
- wp pool.Pool
- tWorkers []worker.Worker
- running sync.Map
-}
-
-// newActivityPool
-func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) {
- const op = errors.Op("new_activity_pool")
- // env variables
- env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}
- wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return &activityPoolImpl{
- codec: codec,
- wp: wp,
- running: sync.Map{},
- }, nil
-}
-
-// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
-func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("activity_pool_start")
- pool.dc = temporal.GetDataConverter()
-
- err := pool.initWorkers(ctx, temporal)
- if err != nil {
- return errors.E(op, err)
- }
-
- for i := 0; i < len(pool.tWorkers); i++ {
- err := pool.tWorkers[i].Start()
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- return nil
-}
-
-// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
-func (pool *activityPoolImpl) Destroy(ctx context.Context) error {
- for i := 0; i < len(pool.tWorkers); i++ {
- pool.tWorkers[i].Stop()
- }
-
- pool.wp.Destroy(ctx)
- return nil
-}
-
-// Workers returns list of all allocated workers.
-func (pool *activityPoolImpl) Workers() []rrWorker.SyncWorker {
- return pool.wp.Workers()
-}
-
-// ActivityNames returns list of all available activity names.
-func (pool *activityPoolImpl) ActivityNames() []string {
- return pool.activities
-}
-
-// ActivityNames returns list of all available activity names.
-func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) {
- const op = errors.Op("activity_pool_get_activity_context")
- c, ok := pool.running.Load(string(taskToken))
- if !ok {
- return nil, errors.E(op, errors.Str("heartbeat on non running activity"))
- }
-
- return c.(context.Context), nil
-}
-
-// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
-func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("activity_pool_create_temporal_worker")
-
- workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter())
- if err != nil {
- return errors.E(op, err)
- }
-
- pool.activities = make([]string, 0)
- pool.tWorkers = make([]worker.Worker, 0)
-
- for i := 0; i < len(workerInfo); i++ {
- w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options)
- if err != nil {
- return errors.E(op, err, pool.Destroy(ctx))
- }
-
- pool.tWorkers = append(pool.tWorkers, w)
- for j := 0; j < len(workerInfo[i].Activities); j++ {
- w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{
- Name: workerInfo[i].Activities[j].Name,
- DisableAlreadyRegisteredCheck: false,
- })
-
- pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name)
- }
- }
-
- return nil
-}
-
-// executes activity with underlying worker.
-func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) {
- const op = errors.Op("activity_pool_execute_activity")
-
- heartbeatDetails := &common.Payloads{}
- if activity.HasHeartbeatDetails(ctx) {
- err := activity.GetHeartbeatDetails(ctx, &heartbeatDetails)
- if err != nil {
- return nil, errors.E(op, err)
- }
- }
-
- var info = activity.GetInfo(ctx)
- var msg = rrt.Message{
- ID: atomic.AddUint64(&pool.seqID, 1),
- Command: rrt.InvokeActivity{
- Name: info.ActivityType.Name,
- Info: info,
- HeartbeatDetails: len(heartbeatDetails.Payloads),
- },
- Payloads: args,
- }
-
- if len(heartbeatDetails.Payloads) != 0 {
- msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...)
- }
-
- pool.running.Store(string(info.TaskToken), ctx)
- defer pool.running.Delete(string(info.TaskToken))
-
- result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if len(result) != 1 {
- return nil, errors.E(op, errors.Str("invalid activity worker response"))
- }
-
- out := result[0]
- if out.Failure != nil {
- if out.Failure.Message == doNotCompleteOnReturn {
- return nil, activity.ErrResultPending
- }
-
- return nil, internalbindings.ConvertFailureToError(out.Failure, pool.dc)
- }
-
- return out.Payloads, nil
-}
diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go
deleted file mode 100644
index 5e562a8d..00000000
--- a/plugins/temporal/activity/plugin.go
+++ /dev/null
@@ -1,215 +0,0 @@
-package activity
-
-import (
- "context"
- "time"
-
- "github.com/cenkalti/backoff/v4"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-
- "sync"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/client"
-)
-
-const (
- // PluginName defines public service name.
- PluginName = "activities"
-
- // RRMode sets as RR_MODE env variable to let worker know about the mode to run.
- RRMode = "temporal/activity"
-)
-
-// Plugin to manage activity execution.
-type Plugin struct {
- temporal client.Temporal
- events events.Handler
- server server.Server
- log logger.Logger
- mu sync.Mutex
- reset chan struct{}
- pool activityPool
- closing int64
-}
-
-// Init configures activity service.
-func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
- const op = errors.Op("activity_plugin_init")
- if temporal.GetConfig().Activities == nil {
- // no need to serve activities
- return errors.E(op, errors.Disabled)
- }
-
- p.temporal = temporal
- p.server = server
- p.events = events.NewEventsHandler()
- p.log = log
- p.reset = make(chan struct{})
-
- return nil
-}
-
-// Serve activities with underlying workers.
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("activity_plugin_serve")
-
- errCh := make(chan error, 1)
- pool, err := p.startPool()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- p.pool = pool
-
- go func() {
- for {
- select {
- case <-p.reset:
- if atomic.LoadInt64(&p.closing) == 1 {
- return
- }
-
- err := p.replacePool()
- if err == nil {
- continue
- }
-
- bkoff := backoff.NewExponentialBackOff()
- bkoff.InitialInterval = time.Second
-
- err = backoff.Retry(p.replacePool, bkoff)
- if err != nil {
- errCh <- errors.E(op, err)
- }
- }
- }
- }()
-
- return errCh
-}
-
-// Stop stops the serving plugin.
-func (p *Plugin) Stop() error {
- atomic.StoreInt64(&p.closing, 1)
- const op = errors.Op("activity_plugin_stop")
-
- pool := p.getPool()
- if pool != nil {
- p.pool = nil
- err := pool.Destroy(context.Background())
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- }
-
- return nil
-}
-
-// Name of the service.
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-// RPC returns associated rpc service.
-func (p *Plugin) RPC() interface{} {
- return &rpc{srv: p, client: p.temporal.GetClient()}
-}
-
-// Workers returns pool workers.
-func (p *Plugin) Workers() []worker.SyncWorker {
- return p.getPool().Workers()
-}
-
-// ActivityNames returns list of all available activities.
-func (p *Plugin) ActivityNames() []string {
- return p.pool.ActivityNames()
-}
-
-// Reset resets underlying workflow pool with new copy.
-func (p *Plugin) Reset() error {
- p.reset <- struct{}{}
-
- return nil
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) AddListener(listener events.Listener) {
- p.events.AddListener(listener)
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) poolListener(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventPoolError {
- p.log.Error("Activity pool error", "error", ev.Payload.(error))
- p.reset <- struct{}{}
- }
- }
-
- p.events.Push(event)
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) startPool() (activityPool, error) {
- pool, err := newActivityPool(
- p.temporal.GetCodec().WithLogger(p.log),
- p.poolListener,
- *p.temporal.GetConfig().Activities,
- p.server,
- )
-
- if err != nil {
- return nil, errors.E(errors.Op("newActivityPool"), err)
- }
-
- err = pool.Start(context.Background(), p.temporal)
- if err != nil {
- return nil, errors.E(errors.Op("startActivityPool"), err)
- }
-
- p.log.Debug("Started activity processing", "activities", pool.ActivityNames())
-
- return pool, nil
-}
-
-func (p *Plugin) replacePool() error {
- pool, err := p.startPool()
- if err != nil {
- p.log.Error("Replace activity pool failed", "error", err)
- return errors.E(errors.Op("newActivityPool"), err)
- }
-
- p.log.Debug("Replace activity pool")
-
- var previous activityPool
-
- p.mu.Lock()
- previous, p.pool = p.pool, pool
- p.mu.Unlock()
-
- errD := previous.Destroy(context.Background())
- if errD != nil {
- p.log.Error(
- "Unable to destroy expired activity pool",
- "error",
- errors.E(errors.Op("destroyActivityPool"), errD),
- )
- }
-
- return nil
-}
-
-// getPool returns currently pool.
-func (p *Plugin) getPool() activityPool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.pool
-}
diff --git a/plugins/temporal/activity/rpc.go b/plugins/temporal/activity/rpc.go
deleted file mode 100644
index 49efcd4f..00000000
--- a/plugins/temporal/activity/rpc.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package activity
-
-import (
- v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/activity"
- "go.temporal.io/sdk/client"
- "google.golang.org/protobuf/proto"
-)
-
-/*
-- the method's type is exported.
-- the method is exported.
-- the method has two arguments, both exported (or builtin) types.
-- the method's second argument is a pointer.
-- the method has return type error.
-*/
-type rpc struct {
- srv *Plugin
- client client.Client
-}
-
-// RecordHeartbeatRequest sent by activity to record current state.
-type RecordHeartbeatRequest struct {
- TaskToken []byte `json:"taskToken"`
- Details []byte `json:"details"`
-}
-
-// RecordHeartbeatResponse sent back to the worker to indicate that activity was cancelled.
-type RecordHeartbeatResponse struct {
- Canceled bool `json:"canceled"`
-}
-
-// RecordActivityHeartbeat records heartbeat for an activity.
-// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity.
-// details - is the progress you want to record along with heart beat for this activity.
-// The errors it can return:
-// - EntityNotExistsError
-// - InternalServiceError
-// - CanceledError
-func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHeartbeatResponse) error {
- details := &commonpb.Payloads{}
-
- if len(in.Details) != 0 {
- if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil {
- return err
- }
- }
-
- // find running activity
- ctx, err := r.srv.getPool().GetActivityContext(in.TaskToken)
- if err != nil {
- return err
- }
-
- activity.RecordHeartbeat(ctx, details)
-
- select {
- case <-ctx.Done():
- *out = RecordHeartbeatResponse{Canceled: true}
- default:
- *out = RecordHeartbeatResponse{Canceled: false}
- }
-
- return nil
-}
diff --git a/plugins/temporal/client/doc/doc.go b/plugins/temporal/client/doc/doc.go
deleted file mode 100644
index 10257070..00000000
--- a/plugins/temporal/client/doc/doc.go
+++ /dev/null
@@ -1 +0,0 @@
-package doc
diff --git a/plugins/temporal/client/doc/temporal.drawio b/plugins/temporal/client/doc/temporal.drawio
deleted file mode 100644
index f2350af8..00000000
--- a/plugins/temporal/client/doc/temporal.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2020-10-20T11:17:09.390Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="U-Sm3BmbD4KaqDkjKEOx" version="13.7.9" type="device"><diagram name="Page-1" id="6133507b-19e7-1e82-6fc7-422aa6c4b21f">7V1tc6M2EP41nsl1Jh5eDIaPsZPcdXptc83N5dpvspFtNRhRkGP7fn0lkDAgSEgCxrkoH2K0SEJon919VrwNzOl69zEC4ep37EF/YGjebmBeDgxD17UR/WGSPZdohpVKlhHyuOwguEU/oKjIpRvkwbhQkWDsExQWhXMcBHBOCjIQRXhbrLbAfvGoIVhCSXA7B74svUMeWaVSwzTtw45PEC1X4tC2xU95Bub3ywhvAn7AgWEukr909xqIzviZxivg4W1OZF4NzGmEMUm31rsp9NnsinlL213X7M0GHsGANGkwc0ezuW46c+A6GlycO3xYZC/mAnp0angRR2SFlzgA/tVBOklOF7IONVpakbVPN3W66YMZ9CfZjEyxjyO6K8ABaxYTEJELpq2S7Br5Pu8sKXN8OLQMA6/Ugkpy9WmJ19ZtWlzggFyDNfIZEL/ByAMB4GJRjc74RJ40Po8x3kRzPg9jx7V13XIXmg1dBzrnI5MjE0RLSGoq6eO0EpvFXNdcGR8hXkMS7WmFCPqAoIciAAHH8TKrx5vSSQD7XIUQo4DEuZ5vmIBWyGxyzAHHTdLR3CIuntmAbqRjEKXcyRxECdiaAU/nHuIB+Bs+CdpwqEtwJHBHiiiLYIx+gFlSgWGAzwWtbU0G1iWVAB8tAyqYU/1CisDJA4wIouZ+wXeskeclUK5B7MGKZfBUYkwCVI2dsYHAXUHn3KfxEyq4hQJgeKtzbWiNR1ZBV+cO91PPA5UEAqcIgXGpA7xYxBT4RVS8Gge2hAMFgqdAoA0dV3gagQG9FQhkdpl16x4BBO6biUIeAmsceF9XKOg1GDlGg2AkIlZ7waig8+dYuStb+XD4y89q6OJs2/D2zsjuxNKFSvaV7bvx9Zqk8FO1cxzCN2DgrnUqBi7ymyKd+2kNXCC5FQunarXaMWm95Ci0Ixi1GOZ7IHDtqV0b2q5b6YFfiQGjBAHjGH5dQsDAsH2m8LAAA/u/DVvnmKypT2P86YLu1cId/Z+4SS2VnxMcpvtGuX0MQeccEmwfR0XWJ91a8t/kyCgnAOuQoUQu/cr6WADqhIW42KTQI50VVD7KLJIkQvAVrkMcAT/XfFauTGVhWfaiKfPhguTmrG5S5OFWjWAVsSGIRTrRl17fbTdDZnU/QjL1EVX12QdaOpsn28NUNDCmLPTRgB19qDpB0R4HC7RM2qebNXW5yoNZzH6mEQQE3uHoHkZnBMT3XzaQgpudZ4SCZXpsHBKEg5hubZOKwz9TQTJWLkq7qBprcepLDrPs6xKbyFxiOncTTGst/ISvLBKW8ribm1K+gdhYtD/gtuSNCyQrW6RkhTleoznffopyRfg+W0DV+eFz3pj9XV8/Rokkx1rrQEduabGCF7eH9VtLrGes8ku35aD4EvZjmJZl2WPHAI4JbU+r8IBTH8RxANZQUm28RWsfJDOW00CiOV5JbxL9UkTMV8j3PoM93rDzo4R2fi9KkxWO0A/aLRDHyPNdwy7UuGUtuZaT0AxvhHayaA0/g5hkqPB9EMZolg04NesJJgSvBY74mTIKXRWUu0Jd7nC8UiOq/npQ2qMiKHVNXCfIwVK3q2BpOx3A0qgIzMwtLhBkV0iY8yX7UIaoTNfSWS7PqjzRJQ9VDdo4BHPqRD8ndS5HB8lffEKYaLtCBN6GIMmQthEIiw5vRclfkqJFmABSSyQndK6n2tBilJL6YIuFMVFOWCaN0SxE0PMDKNE7pCDfwvjglfOIqDX8pyHCIWHYzQAh6rWKB1Ph4dTwYBk94kG+BqPw0DMexk6PeJCvxUiq91GZuTQKwE/oPUvluaK/MhxcnusSGEwZDGaFkhPKcINjxPg4lUVp3YpVhmPrN7tZ4ikFd8EHxjX2TnteYe+M2foHZfa9wEI/YhyovWtB5S8qfykk1ZacVVenL3oL7kqCpaH4asuOqt7yTzKBkRExUog4OUQcM4WRhyvfWaA4a9sabpqUdBIEFGk9Wct3+owFI5kdKNL6DkmrbozLrLXxovuoC1wqjtK1p8pM/22w1pFaZz09RPTKWq2qW2YUa21Xw32yVoEuxVpPz/KPyVqluyUVLE4VFkddgi/jom6lU+Gif1yYR7wkK43Xlte3VJb7DrNcxy4muc1vLXONDmApUm6V0nTmqTLLfxtJ7rhqnValNO1quGnS2kVKI55IU5HofUciab2VhiKzGSz1URexyFELrl3Hosz230YscqouCitE9IuIXhdc3Sq+qthJuxruc8HVrQsCagWld8vvc8FVweJkYdHvPc+ufFFWpTPvMZ2x9JenMy28SUUGZtVTOIq8tumrMtt/G+mMrslLLwoSfUOi39uetaorhiqhaVfFfSY02YOCirqenu33mdHU3UyoYNE7LHp+ilNzJGiolOY9pjSa+9KbBfTy+35bQqZakO/86R2t7vXSJ5rU6OoWkhPERL9ZTcXb4FVW07qOe01rKt41ofjrqVj/MfOai5vb8fe7y4urL1eju+Cf6NO33+yKFylezAl6QGR/C6MH9vLKehr7FrhrKvod7AoVn8toH32tZxsE0ijxR4GBHC6yu4AK9NFswWdUAqPBCujhjbsaOrzYt9p9HN9nHNk/PAqRoqVIeHnUMl/DLisx05kvafDUZfp23mswJzgZusJN/7ipYKDHxU2D628iKN1grNzNacCmgtQeFzbNL8j8vPnKM4lJO5qveqVipeq74ibtXKh53z6iM3BULMkf1y+8/hObhzqfcWLmzMr/hYQmRWkmAjYEF3NiuEPke277b9YV1VNauhTpR1LYi0JAT/d7vpC2MixRPrRLSqLhy9KR/OePHqP8+S8g1Xx5pPPvH9Hi4Rut6RdTDp/CNa/+Bw==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go
deleted file mode 100644
index 047a1815..00000000
--- a/plugins/temporal/client/plugin.go
+++ /dev/null
@@ -1,169 +0,0 @@
-package client
-
-import (
- "fmt"
- "os"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- "go.temporal.io/sdk/client"
- "go.temporal.io/sdk/converter"
- "go.temporal.io/sdk/worker"
-)
-
-// PluginName defines public service name.
-const PluginName = "temporal"
-
-// indicates that the case size was set
-var stickyCacheSet = false
-
-// Plugin implement Temporal contract.
-type Plugin struct {
- workerID int32
- cfg *Config
- dc converter.DataConverter
- log logger.Logger
- client client.Client
-}
-
-// Temporal define common interface for RoadRunner plugins.
-type Temporal interface {
- GetClient() client.Client
- GetDataConverter() converter.DataConverter
- GetConfig() Config
- GetCodec() rrt.Codec
- CreateWorker(taskQueue string, options worker.Options) (worker.Worker, error)
-}
-
-// Config of the temporal client and depended services.
-type Config struct {
- Address string
- Namespace string
- Activities *pool.Config
- Codec string
- DebugLevel int `mapstructure:"debug_level"`
- CacheSize int `mapstructure:"cache_size"`
-}
-
-// Init initiates temporal client plugin.
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("temporal_client_plugin_init")
- p.log = log
- p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter())
- err := cfg.UnmarshalKey(PluginName, &p.cfg)
- if err != nil {
- return errors.E(op, err)
- }
- if p.cfg == nil {
- return errors.E(op, errors.Disabled)
- }
-
- return nil
-}
-
-// GetConfig returns temporal configuration.
-func (p *Plugin) GetConfig() Config {
- if p.cfg != nil {
- return *p.cfg
- }
- // empty
- return Config{}
-}
-
-// GetCodec returns communication codec.
-func (p *Plugin) GetCodec() rrt.Codec {
- if p.cfg.Codec == "json" {
- return rrt.NewJSONCodec(rrt.DebugLevel(p.cfg.DebugLevel), p.log)
- }
-
- // production ready protocol, no debug abilities
- return rrt.NewProtoCodec()
-}
-
-// GetDataConverter returns data active data converter.
-func (p *Plugin) GetDataConverter() converter.DataConverter {
- return p.dc
-}
-
-// Serve starts temporal srv.
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("temporal_client_plugin_serve")
- errCh := make(chan error, 1)
- var err error
-
- if stickyCacheSet == false && p.cfg.CacheSize != 0 {
- worker.SetStickyWorkflowCacheSize(p.cfg.CacheSize)
- stickyCacheSet = true
- }
-
- p.client, err = client.NewClient(client.Options{
- Logger: p.log,
- HostPort: p.cfg.Address,
- Namespace: p.cfg.Namespace,
- DataConverter: p.dc,
- })
-
- if err != nil {
- errCh <- errors.E(op, err)
- }
-
- p.log.Debug("connected to temporal server", "address", p.cfg.Address)
-
- return errCh
-}
-
-// Stop stops temporal srv connection.
-func (p *Plugin) Stop() error {
- if p.client != nil {
- p.client.Close()
- }
-
- return nil
-}
-
-// GetClient returns active srv connection.
-func (p *Plugin) GetClient() client.Client {
- return p.client
-}
-
-// CreateWorker allocates new temporal worker on an active connection.
-func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) {
- const op = errors.Op("temporal_client_plugin_create_worker")
- if p.client == nil {
- return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client"))
- }
-
- if options.Identity == "" {
- if tq == "" {
- tq = client.DefaultNamespace
- }
-
- // ensures unique worker IDs
- options.Identity = fmt.Sprintf(
- "%d@%s@%s@%v",
- os.Getpid(),
- getHostName(),
- tq,
- atomic.AddInt32(&p.workerID, 1),
- )
- }
-
- return worker.New(p.client, tq, options), nil
-}
-
-// Name of the service.
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func getHostName() string {
- hostName, err := os.Hostname()
- if err != nil {
- hostName = "Unknown"
- }
- return hostName
-}
diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go
deleted file mode 100644
index 406e70f4..00000000
--- a/plugins/temporal/protocol/converter.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package protocol
-
-import (
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/converter"
-)
-
-type (
- // DataConverter wraps Temporal data converter to enable direct access to the payloads.
- DataConverter struct {
- fallback converter.DataConverter
- }
-)
-
-// NewDataConverter creates new data converter.
-func NewDataConverter(fallback converter.DataConverter) converter.DataConverter {
- return &DataConverter{fallback: fallback}
-}
-
-// ToPayloads converts a list of values.
-func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) {
- for _, v := range values {
- if aggregated, ok := v.(*commonpb.Payloads); ok {
- // bypassing
- return aggregated, nil
- }
- }
-
- return r.fallback.ToPayloads(values...)
-}
-
-// ToPayload converts single value to payload.
-func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
- return r.fallback.ToPayload(value)
-}
-
-// FromPayloads converts to a list of values of different types.
-// Useful for deserializing arguments of function invocations.
-func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
- if payloads == nil {
- return nil
- }
-
- if len(valuePtrs) == 1 {
- // input proxying
- if input, ok := valuePtrs[0].(**commonpb.Payloads); ok {
- *input = &commonpb.Payloads{}
- (*input).Payloads = payloads.Payloads
- return nil
- }
- }
-
- for i := 0; i < len(payloads.Payloads); i++ {
- err := r.FromPayload(payloads.Payloads[i], valuePtrs[i])
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// FromPayload converts single value from payload.
-func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
- return r.fallback.FromPayload(payload, valuePtr)
-}
-
-// ToString converts payload object into human readable string.
-func (r *DataConverter) ToString(input *commonpb.Payload) string {
- return r.fallback.ToString(input)
-}
-
-// ToStrings converts payloads object into human readable strings.
-func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string {
- return r.fallback.ToStrings(input)
-}
diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go
deleted file mode 100644
index 6ce9fa0f..00000000
--- a/plugins/temporal/protocol/converter_test.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package protocol
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/converter"
-)
-
-func Test_Passthough(t *testing.T) {
- codec := NewDataConverter(converter.GetDefaultDataConverter())
-
- value, err := codec.ToPayloads("test")
- assert.NoError(t, err)
-
- out := &common.Payloads{}
-
- assert.Len(t, out.Payloads, 0)
- assert.NoError(t, codec.FromPayloads(value, &out))
-
- assert.Len(t, out.Payloads, 1)
-}
diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go
deleted file mode 100644
index c554e28f..00000000
--- a/plugins/temporal/protocol/internal/protocol.pb.go
+++ /dev/null
@@ -1,167 +0,0 @@
-// Code generated by protoc-gen-go. DO NOT EDIT.
-// source: protocol.proto
-
-package internal
-
-import (
- fmt "fmt"
- math "math"
-
- proto "github.com/golang/protobuf/proto"
- v11 "go.temporal.io/api/common/v1"
- v1 "go.temporal.io/api/failure/v1"
-)
-
-// Reference imports to suppress errors if they are not otherwise used.
-var _ = proto.Marshal
-var _ = fmt.Errorf
-var _ = math.Inf
-
-// This is a compile-time assertion to ensure that this generated file
-// is compatible with the proto package it is being compiled against.
-// A compilation error at this line likely means your copy of the
-// proto package needs to be updated.
-const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
-
-type Frame struct {
- Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
-}
-
-func (m *Frame) Reset() { *m = Frame{} }
-func (m *Frame) String() string { return proto.CompactTextString(m) }
-func (*Frame) ProtoMessage() {}
-func (*Frame) Descriptor() ([]byte, []int) {
- return fileDescriptor_2bc2336598a3f7e0, []int{0}
-}
-
-func (m *Frame) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_Frame.Unmarshal(m, b)
-}
-func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_Frame.Marshal(b, m, deterministic)
-}
-func (m *Frame) XXX_Merge(src proto.Message) {
- xxx_messageInfo_Frame.Merge(m, src)
-}
-func (m *Frame) XXX_Size() int {
- return xxx_messageInfo_Frame.Size(m)
-}
-func (m *Frame) XXX_DiscardUnknown() {
- xxx_messageInfo_Frame.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Frame proto.InternalMessageInfo
-
-func (m *Frame) GetMessages() []*Message {
- if m != nil {
- return m.Messages
- }
- return nil
-}
-
-// Single communication message.
-type Message struct {
- Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
- // command name (if any)
- Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"`
- // command options in json format.
- Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"`
- // error response.
- Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"`
- // invocation or result payloads.
- Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
-}
-
-func (m *Message) Reset() { *m = Message{} }
-func (m *Message) String() string { return proto.CompactTextString(m) }
-func (*Message) ProtoMessage() {}
-func (*Message) Descriptor() ([]byte, []int) {
- return fileDescriptor_2bc2336598a3f7e0, []int{1}
-}
-
-func (m *Message) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_Message.Unmarshal(m, b)
-}
-func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_Message.Marshal(b, m, deterministic)
-}
-func (m *Message) XXX_Merge(src proto.Message) {
- xxx_messageInfo_Message.Merge(m, src)
-}
-func (m *Message) XXX_Size() int {
- return xxx_messageInfo_Message.Size(m)
-}
-func (m *Message) XXX_DiscardUnknown() {
- xxx_messageInfo_Message.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Message proto.InternalMessageInfo
-
-func (m *Message) GetId() uint64 {
- if m != nil {
- return m.Id
- }
- return 0
-}
-
-func (m *Message) GetCommand() string {
- if m != nil {
- return m.Command
- }
- return ""
-}
-
-func (m *Message) GetOptions() []byte {
- if m != nil {
- return m.Options
- }
- return nil
-}
-
-func (m *Message) GetFailure() *v1.Failure {
- if m != nil {
- return m.Failure
- }
- return nil
-}
-
-func (m *Message) GetPayloads() *v11.Payloads {
- if m != nil {
- return m.Payloads
- }
- return nil
-}
-
-func init() {
- proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame")
- proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message")
-}
-
-func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) }
-
-var fileDescriptor_2bc2336598a3f7e0 = []byte{
- // 257 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31,
- 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec,
- 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92,
- 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef,
- 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c,
- 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56,
- 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3,
- 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47,
- 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b,
- 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2,
- 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf,
- 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a,
- 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4,
- 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43,
- 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef,
- 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00,
- 0x00,
-}
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go
deleted file mode 100644
index e7a77068..00000000
--- a/plugins/temporal/protocol/json_codec.go
+++ /dev/null
@@ -1,225 +0,0 @@
-package protocol
-
-import (
- "github.com/fatih/color"
- j "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-var json = j.ConfigCompatibleWithStandardLibrary
-
-// JSONCodec can be used for debugging and log capturing reasons.
-type JSONCodec struct {
- // level enables verbose logging or all incoming and outcoming messages.
- level DebugLevel
-
- // logger renders messages when debug enabled.
- logger logger.Logger
-}
-
-// jsonFrame contains message command in binary form.
-type jsonFrame struct {
- // ID contains ID of the command, response or error.
- ID uint64 `json:"id"`
-
- // Command name. Optional.
- Command string `json:"command,omitempty"`
-
- // Options to be unmarshalled to body (raw payload).
- Options j.RawMessage `json:"options,omitempty"`
-
- // Failure associated with command id.
- Failure []byte `json:"failure,omitempty"`
-
- // Payloads specific to the command or result.
- Payloads []byte `json:"payloads,omitempty"`
-}
-
-// NewJSONCodec creates new Json communication codec.
-func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec {
- return &JSONCodec{
- level: level,
- logger: logger,
- }
-}
-
-// WithLogger creates new codes instance with attached logger.
-func (c *JSONCodec) WithLogger(logger logger.Logger) Codec {
- return &JSONCodec{
- level: c.level,
- logger: logger,
- }
-}
-
-// GetName returns codec name.
-func (c *JSONCodec) GetName() string {
- return "json"
-}
-
-// Execute exchanges commands with worker.
-func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
- const op = errors.Op("json_codec_execute")
- if len(msg) == 0 {
- return nil, nil
- }
-
- var response = make([]jsonFrame, 0, 5)
- var result = make([]Message, 0, 5)
- var err error
-
- frames := make([]jsonFrame, 0, len(msg))
- for _, m := range msg {
- frame, err := c.packFrame(m)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- frames = append(frames, frame)
- }
-
- p := payload.Payload{}
-
- if ctx.IsEmpty() {
- p.Context = []byte("null")
- }
-
- p.Context, err = json.Marshal(ctx)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- p.Body, err = json.Marshal(frames)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if c.level >= DebugNormal {
- logMessage := string(p.Body) + " " + string(p.Context)
- if c.level >= DebugHumanized {
- logMessage = color.GreenString(logMessage)
- }
-
- c.logger.Debug(logMessage)
- }
-
- out, err := e.Exec(p)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if len(out.Body) == 0 {
- // worker inactive or closed
- return nil, nil
- }
-
- if c.level >= DebugNormal {
- logMessage := string(out.Body)
- if c.level >= DebugHumanized {
- logMessage = color.HiYellowString(logMessage)
- }
-
- c.logger.Debug(logMessage, "receive", true)
- }
-
- err = json.Unmarshal(out.Body, &response)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- for _, f := range response {
- msg, err := c.parseFrame(f)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- result = append(result, msg)
- }
-
- return result, nil
-}
-
-func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) {
- var (
- err error
- frame jsonFrame
- )
-
- frame.ID = msg.ID
-
- if msg.Payloads != nil {
- frame.Payloads, err = msg.Payloads.Marshal()
- if err != nil {
- return jsonFrame{}, err
- }
- }
-
- if msg.Failure != nil {
- frame.Failure, err = msg.Failure.Marshal()
- if err != nil {
- return jsonFrame{}, err
- }
- }
-
- if msg.Command == nil {
- return frame, nil
- }
-
- frame.Command, err = commandName(msg.Command)
- if err != nil {
- return jsonFrame{}, err
- }
-
- frame.Options, err = json.Marshal(msg.Command)
- if err != nil {
- return jsonFrame{}, err
- }
-
- return frame, nil
-}
-
-func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) {
- var (
- err error
- msg Message
- )
-
- msg.ID = frame.ID
-
- if frame.Payloads != nil {
- msg.Payloads = &common.Payloads{}
-
- err = msg.Payloads.Unmarshal(frame.Payloads)
- if err != nil {
- return Message{}, err
- }
- }
-
- if frame.Failure != nil {
- msg.Failure = &failure.Failure{}
-
- err = msg.Failure.Unmarshal(frame.Failure)
- if err != nil {
- return Message{}, err
- }
- }
-
- if frame.Command != "" {
- cmd, err := initCommand(frame.Command)
- if err != nil {
- return Message{}, err
- }
-
- err = json.Unmarshal(frame.Options, &cmd)
- if err != nil {
- return Message{}, err
- }
-
- msg.Command = cmd
- }
-
- return msg, nil
-}
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go
deleted file mode 100644
index d5e0f49d..00000000
--- a/plugins/temporal/protocol/message.go
+++ /dev/null
@@ -1,334 +0,0 @@
-package protocol
-
-import (
- "time"
-
- "github.com/spiral/errors"
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/activity"
- bindings "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/workflow"
-)
-
-const (
- getWorkerInfoCommand = "GetWorkerInfo"
-
- invokeActivityCommand = "InvokeActivity"
- startWorkflowCommand = "StartWorkflow"
- invokeSignalCommand = "InvokeSignal"
- invokeQueryCommand = "InvokeQuery"
- destroyWorkflowCommand = "DestroyWorkflow"
- cancelWorkflowCommand = "CancelWorkflow"
- getStackTraceCommand = "StackTrace"
-
- executeActivityCommand = "ExecuteActivity"
- executeChildWorkflowCommand = "ExecuteChildWorkflow"
- getChildWorkflowExecutionCommand = "GetChildWorkflowExecution"
-
- newTimerCommand = "NewTimer"
- sideEffectCommand = "SideEffect"
- getVersionCommand = "GetVersion"
- completeWorkflowCommand = "CompleteWorkflow"
- continueAsNewCommand = "ContinueAsNew"
-
- signalExternalWorkflowCommand = "SignalExternalWorkflow"
- cancelExternalWorkflowCommand = "CancelExternalWorkflow"
-
- cancelCommand = "Cancel"
- panicCommand = "Panic"
-)
-
-// GetWorkerInfo reads worker information.
-type GetWorkerInfo struct{}
-
-// InvokeActivity invokes activity.
-type InvokeActivity struct {
- // Name defines activity name.
- Name string `json:"name"`
-
- // Info contains execution context.
- Info activity.Info `json:"info"`
-
- // HeartbeatDetails indicates that the payload also contains last heartbeat details.
- HeartbeatDetails int `json:"heartbeatDetails,omitempty"`
-}
-
-// StartWorkflow sends worker command to start workflow.
-type StartWorkflow struct {
- // Info to define workflow context.
- Info *workflow.Info `json:"info"`
-
- // LastCompletion contains offset of last completion results.
- LastCompletion int `json:"lastCompletion,omitempty"`
-}
-
-// InvokeSignal invokes signal with a set of arguments.
-type InvokeSignal struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-
- // Name of the signal.
- Name string `json:"name"`
-}
-
-// InvokeQuery invokes query with a set of arguments.
-type InvokeQuery struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
- // Name of the query.
- Name string `json:"name"`
-}
-
-// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal).
-type CancelWorkflow struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// DestroyWorkflow asks worker to offload workflow from memory.
-type DestroyWorkflow struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// GetStackTrace asks worker to offload workflow from memory.
-type GetStackTrace struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// ExecuteActivity command by workflow worker.
-type ExecuteActivity struct {
- // Name defines activity name.
- Name string `json:"name"`
- // Options to run activity.
- Options bindings.ExecuteActivityOptions `json:"options,omitempty"`
-}
-
-// ExecuteChildWorkflow executes child workflow.
-type ExecuteChildWorkflow struct {
- // Name defines workflow name.
- Name string `json:"name"`
- // Options to run activity.
- Options bindings.WorkflowOptions `json:"options,omitempty"`
-}
-
-// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow.
-type GetChildWorkflowExecution struct {
- // ID of child workflow command.
- ID uint64 `json:"id"`
-}
-
-// NewTimer starts new timer.
-type NewTimer struct {
- // Milliseconds defines timer duration.
- Milliseconds int `json:"ms"`
-}
-
-// SideEffect to be recorded into the history.
-type SideEffect struct{}
-
-// GetVersion requests version marker.
-type GetVersion struct {
- ChangeID string `json:"changeID"`
- MinSupported int `json:"minSupported"`
- MaxSupported int `json:"maxSupported"`
-}
-
-// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload.
-type CompleteWorkflow struct{}
-
-// ContinueAsNew restarts workflow with new running instance.
-type ContinueAsNew struct {
- // Result defines workflow execution result.
- Name string `json:"name"`
-
- // Options for continued as new workflow.
- Options struct {
- TaskQueueName string
- WorkflowExecutionTimeout time.Duration
- WorkflowRunTimeout time.Duration
- WorkflowTaskTimeout time.Duration
- } `json:"options"`
-}
-
-// SignalExternalWorkflow sends signal to external workflow.
-type SignalExternalWorkflow struct {
- Namespace string `json:"namespace"`
- WorkflowID string `json:"workflowID"`
- RunID string `json:"runID"`
- Signal string `json:"signal"`
- ChildWorkflowOnly bool `json:"childWorkflowOnly"`
-}
-
-// CancelExternalWorkflow canceller external workflow.
-type CancelExternalWorkflow struct {
- Namespace string `json:"namespace"`
- WorkflowID string `json:"workflowID"`
- RunID string `json:"runID"`
-}
-
-// Cancel one or multiple internal promises (activities, local activities, timers, child workflows).
-type Cancel struct {
- // CommandIDs to be cancelled.
- CommandIDs []uint64 `json:"ids"`
-}
-
-// Panic triggers panic in workflow process.
-type Panic struct {
- // Message to include into the error.
- Message string `json:"message"`
-}
-
-// ActivityParams maps activity command to activity params.
-func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams {
- params := bindings.ExecuteActivityParams{
- ExecuteActivityOptions: cmd.Options,
- ActivityType: bindings.ActivityType{Name: cmd.Name},
- Input: payloads,
- }
-
- if params.TaskQueueName == "" {
- params.TaskQueueName = env.WorkflowInfo().TaskQueueName
- }
-
- return params
-}
-
-// WorkflowParams maps workflow command to workflow params.
-func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams {
- params := bindings.ExecuteWorkflowParams{
- WorkflowOptions: cmd.Options,
- WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
- Input: payloads,
- }
-
- if params.TaskQueueName == "" {
- params.TaskQueueName = env.WorkflowInfo().TaskQueueName
- }
-
- return params
-}
-
-// ToDuration converts timer command to time.Duration.
-func (cmd NewTimer) ToDuration() time.Duration {
- return time.Millisecond * time.Duration(cmd.Milliseconds)
-}
-
-// returns command name (only for the commands sent to the worker)
-func commandName(cmd interface{}) (string, error) {
- const op = errors.Op("command_name")
- switch cmd.(type) {
- case GetWorkerInfo, *GetWorkerInfo:
- return getWorkerInfoCommand, nil
- case StartWorkflow, *StartWorkflow:
- return startWorkflowCommand, nil
- case InvokeSignal, *InvokeSignal:
- return invokeSignalCommand, nil
- case InvokeQuery, *InvokeQuery:
- return invokeQueryCommand, nil
- case DestroyWorkflow, *DestroyWorkflow:
- return destroyWorkflowCommand, nil
- case CancelWorkflow, *CancelWorkflow:
- return cancelWorkflowCommand, nil
- case GetStackTrace, *GetStackTrace:
- return getStackTraceCommand, nil
- case InvokeActivity, *InvokeActivity:
- return invokeActivityCommand, nil
- case ExecuteActivity, *ExecuteActivity:
- return executeActivityCommand, nil
- case ExecuteChildWorkflow, *ExecuteChildWorkflow:
- return executeChildWorkflowCommand, nil
- case GetChildWorkflowExecution, *GetChildWorkflowExecution:
- return getChildWorkflowExecutionCommand, nil
- case NewTimer, *NewTimer:
- return newTimerCommand, nil
- case GetVersion, *GetVersion:
- return getVersionCommand, nil
- case SideEffect, *SideEffect:
- return sideEffectCommand, nil
- case CompleteWorkflow, *CompleteWorkflow:
- return completeWorkflowCommand, nil
- case ContinueAsNew, *ContinueAsNew:
- return continueAsNewCommand, nil
- case SignalExternalWorkflow, *SignalExternalWorkflow:
- return signalExternalWorkflowCommand, nil
- case CancelExternalWorkflow, *CancelExternalWorkflow:
- return cancelExternalWorkflowCommand, nil
- case Cancel, *Cancel:
- return cancelCommand, nil
- case Panic, *Panic:
- return panicCommand, nil
- default:
- return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd))
- }
-}
-
-// reads command from binary payload
-func initCommand(name string) (interface{}, error) {
- const op = errors.Op("init_command")
- switch name {
- case getWorkerInfoCommand:
- return &GetWorkerInfo{}, nil
-
- case startWorkflowCommand:
- return &StartWorkflow{}, nil
-
- case invokeSignalCommand:
- return &InvokeSignal{}, nil
-
- case invokeQueryCommand:
- return &InvokeQuery{}, nil
-
- case destroyWorkflowCommand:
- return &DestroyWorkflow{}, nil
-
- case cancelWorkflowCommand:
- return &CancelWorkflow{}, nil
-
- case getStackTraceCommand:
- return &GetStackTrace{}, nil
-
- case invokeActivityCommand:
- return &InvokeActivity{}, nil
-
- case executeActivityCommand:
- return &ExecuteActivity{}, nil
-
- case executeChildWorkflowCommand:
- return &ExecuteChildWorkflow{}, nil
-
- case getChildWorkflowExecutionCommand:
- return &GetChildWorkflowExecution{}, nil
-
- case newTimerCommand:
- return &NewTimer{}, nil
-
- case getVersionCommand:
- return &GetVersion{}, nil
-
- case sideEffectCommand:
- return &SideEffect{}, nil
-
- case completeWorkflowCommand:
- return &CompleteWorkflow{}, nil
-
- case continueAsNewCommand:
- return &ContinueAsNew{}, nil
-
- case signalExternalWorkflowCommand:
- return &SignalExternalWorkflow{}, nil
-
- case cancelExternalWorkflowCommand:
- return &CancelExternalWorkflow{}, nil
-
- case cancelCommand:
- return &Cancel{}, nil
-
- case panicCommand:
- return &Panic{}, nil
-
- default:
- return nil, errors.E(op, errors.Errorf("undefined command name: %s", name))
- }
-}
diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go
deleted file mode 100644
index 607fe0fe..00000000
--- a/plugins/temporal/protocol/proto_codec.go
+++ /dev/null
@@ -1,145 +0,0 @@
-package protocol
-
-import (
- v1 "github.com/golang/protobuf/proto" //nolint:staticcheck
- jsoniter "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal"
- "google.golang.org/protobuf/proto"
-)
-
-type (
- // ProtoCodec uses protobuf to exchange messages with underlying workers.
- ProtoCodec struct {
- }
-)
-
-// NewProtoCodec creates new Proto communication codec.
-func NewProtoCodec() Codec {
- return &ProtoCodec{}
-}
-
-// WithLogger creates new codes instance with attached logger.
-func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec {
- return &ProtoCodec{}
-}
-
-// GetName returns codec name.
-func (c *ProtoCodec) GetName() string {
- return "protobuf"
-}
-
-// Execute exchanges commands with worker.
-func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
- if len(msg) == 0 {
- return nil, nil
- }
-
- var request = &internal.Frame{}
- var response = &internal.Frame{}
- var result = make([]Message, 0, 5)
- var err error
-
- for _, m := range msg {
- frame, err := c.packMessage(m)
- if err != nil {
- return nil, err
- }
-
- request.Messages = append(request.Messages, frame)
- }
-
- p := payload.Payload{}
-
- // context is always in json format
- if ctx.IsEmpty() {
- p.Context = []byte("null")
- }
-
- p.Context, err = jsoniter.Marshal(ctx)
- if err != nil {
- return nil, errors.E(errors.Op("encodeContext"), err)
- }
-
- p.Body, err = proto.Marshal(v1.MessageV2(request))
- if err != nil {
- return nil, errors.E(errors.Op("encodePayload"), err)
- }
-
- out, err := e.Exec(p)
- if err != nil {
- return nil, errors.E(errors.Op("execute"), err)
- }
-
- if len(out.Body) == 0 {
- // worker inactive or closed
- return nil, nil
- }
-
- err = proto.Unmarshal(out.Body, v1.MessageV2(response))
- if err != nil {
- return nil, errors.E(errors.Op("parseResponse"), err)
- }
-
- for _, f := range response.Messages {
- msg, err := c.parseMessage(f)
- if err != nil {
- return nil, err
- }
-
- result = append(result, msg)
- }
-
- return result, nil
-}
-
-func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) {
- var err error
-
- frame := &internal.Message{
- Id: msg.ID,
- Payloads: msg.Payloads,
- Failure: msg.Failure,
- }
-
- if msg.Command != nil {
- frame.Command, err = commandName(msg.Command)
- if err != nil {
- return nil, err
- }
-
- frame.Options, err = jsoniter.Marshal(msg.Command)
- if err != nil {
- return nil, err
- }
- }
-
- return frame, nil
-}
-
-func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
- const op = errors.Op("proto_codec_parse_message")
- var err error
-
- msg := Message{
- ID: frame.Id,
- Payloads: frame.Payloads,
- Failure: frame.Failure,
- }
-
- if frame.Command != "" {
- msg.Command, err = initCommand(frame.Command)
- if err != nil {
- return Message{}, errors.E(op, err)
- }
-
- err = jsoniter.Unmarshal(frame.Options, &msg.Command)
- if err != nil {
- return Message{}, errors.E(op, err)
- }
- }
-
- return msg, nil
-}
diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go
deleted file mode 100644
index 53076fdf..00000000
--- a/plugins/temporal/protocol/protocol.go
+++ /dev/null
@@ -1,77 +0,0 @@
-package protocol
-
-import (
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-const (
- // DebugNone disables all debug messages.
- DebugNone = iota
-
- // DebugNormal renders all messages into console.
- DebugNormal
-
- // DebugHumanized enables color highlights for messages.
- DebugHumanized
-)
-
-// Context provides worker information about currently. Context can be empty for server level commands.
-type Context struct {
- // TaskQueue associates message batch with the specific task queue in underlying worker.
- TaskQueue string `json:"taskQueue,omitempty"`
-
- // TickTime associated current or historical time with message batch.
- TickTime string `json:"tickTime,omitempty"`
-
- // Replay indicates that current message batch is historical.
- Replay bool `json:"replay,omitempty"`
-}
-
-// Message used to exchange the send commands and receive responses from underlying workers.
-type Message struct {
- // ID contains ID of the command, response or error.
- ID uint64 `json:"id"`
-
- // Command of the message in unmarshalled form. Pointer.
- Command interface{} `json:"command,omitempty"`
-
- // Failure associated with command id.
- Failure *failure.Failure `json:"failure,omitempty"`
-
- // Payloads contains message specific payloads in binary format.
- Payloads *commonpb.Payloads `json:"payloads,omitempty"`
-}
-
-// Codec manages payload encoding and decoding while communication with underlying worker.
-type Codec interface {
- // WithLogger creates new codes instance with attached logger.
- WithLogger(logger.Logger) Codec
-
- // GetName returns codec name.
- GetName() string
-
- // Execute sends message to worker and waits for the response.
- Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error)
-}
-
-// Endpoint provides the ability to send and receive messages.
-type Endpoint interface {
- // ExecWithContext allow to set ExecTTL
- Exec(p payload.Payload) (payload.Payload, error)
-}
-
-// DebugLevel configures debug level.
-type DebugLevel int
-
-// IsEmpty only check if task queue set.
-func (ctx Context) IsEmpty() bool {
- return ctx.TaskQueue == ""
-}
-
-// IsCommand returns true if message carries request.
-func (msg Message) IsCommand() bool {
- return msg.Command != nil
-}
diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go
deleted file mode 100644
index 58a0ae66..00000000
--- a/plugins/temporal/protocol/worker_info.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package protocol
-
-import (
- "github.com/spiral/errors"
- "go.temporal.io/sdk/converter"
- "go.temporal.io/sdk/worker"
-)
-
-// WorkerInfo outlines information about every available worker and it's TaskQueues.
-
-// WorkerInfo lists available task queues, workflows and activities.
-type WorkerInfo struct {
- // TaskQueue assigned to the worker.
- TaskQueue string `json:"taskQueue"`
-
- // Options describe worker options.
- Options worker.Options `json:"options,omitempty"`
-
- // Workflows provided by the worker.
- Workflows []WorkflowInfo
-
- // Activities provided by the worker.
- Activities []ActivityInfo
-}
-
-// WorkflowInfo describes single worker workflow.
-type WorkflowInfo struct {
- // Name of the workflow.
- Name string `json:"name"`
-
- // Queries pre-defined for the workflow type.
- Queries []string `json:"queries"`
-
- // Signals pre-defined for the workflow type.
- Signals []string `json:"signals"`
-}
-
-// ActivityInfo describes single worker activity.
-type ActivityInfo struct {
- // Name describes public activity name.
- Name string `json:"name"`
-}
-
-// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process).
-func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) {
- const op = errors.Op("fetch_worker_info")
-
- result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}})
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if len(result) != 1 {
- return nil, errors.E(op, errors.Str("unable to read worker info"))
- }
-
- if result[0].ID != 0 {
- return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing"))
- }
-
- var info []WorkerInfo
- for i := range result[0].Payloads.Payloads {
- wi := WorkerInfo{}
- if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil {
- return nil, errors.E(op, err)
- }
-
- info = append(info, wi)
- }
-
- return info, nil
-}
diff --git a/plugins/temporal/workflow/canceller.go b/plugins/temporal/workflow/canceller.go
deleted file mode 100644
index 962c527f..00000000
--- a/plugins/temporal/workflow/canceller.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package workflow
-
-import (
- "sync"
-)
-
-type cancellable func() error
-
-type canceller struct {
- ids sync.Map
-}
-
-func (c *canceller) register(id uint64, cancel cancellable) {
- c.ids.Store(id, cancel)
-}
-
-func (c *canceller) discard(id uint64) {
- c.ids.Delete(id)
-}
-
-func (c *canceller) cancel(ids ...uint64) error {
- var err error
- for _, id := range ids {
- cancel, ok := c.ids.Load(id)
- if ok == false {
- continue
- }
-
- // TODO return when minimum supported version will be go 1.15
- // go1.14 don't have LoadAndDelete method
- // It was introduced only in go1.15
- c.ids.Delete(id)
-
- err = cancel.(cancellable)()
- if err != nil {
- return err
- }
- }
-
- return nil
-}
diff --git a/plugins/temporal/workflow/canceller_test.go b/plugins/temporal/workflow/canceller_test.go
deleted file mode 100644
index d6e846f8..00000000
--- a/plugins/temporal/workflow/canceller_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package workflow
-
-import (
- "errors"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_CancellerNoListeners(t *testing.T) {
- c := &canceller{}
-
- assert.NoError(t, c.cancel(1))
-}
-
-func Test_CancellerListenerError(t *testing.T) {
- c := &canceller{}
- c.register(1, func() error {
- return errors.New("failed")
- })
-
- assert.Error(t, c.cancel(1))
-}
-
-func Test_CancellerListenerDiscarded(t *testing.T) {
- c := &canceller{}
- c.register(1, func() error {
- return errors.New("failed")
- })
-
- c.discard(1)
- assert.NoError(t, c.cancel(1))
-}
diff --git a/plugins/temporal/workflow/id_registry.go b/plugins/temporal/workflow/id_registry.go
deleted file mode 100644
index ac75cbda..00000000
--- a/plugins/temporal/workflow/id_registry.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package workflow
-
-import (
- "sync"
-
- bindings "go.temporal.io/sdk/internalbindings"
-)
-
-// used to gain access to child workflow ids after they become available via callback result.
-type idRegistry struct {
- mu sync.Mutex
- ids map[uint64]entry
- listeners map[uint64]listener
-}
-
-type listener func(w bindings.WorkflowExecution, err error)
-
-type entry struct {
- w bindings.WorkflowExecution
- err error
-}
-
-func newIDRegistry() *idRegistry {
- return &idRegistry{
- ids: map[uint64]entry{},
- listeners: map[uint64]listener{},
- }
-}
-
-func (c *idRegistry) listen(id uint64, cl listener) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- c.listeners[id] = cl
-
- if e, ok := c.ids[id]; ok {
- cl(e.w, e.err)
- }
-}
-
-func (c *idRegistry) push(id uint64, w bindings.WorkflowExecution, err error) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- e := entry{w: w, err: err}
- c.ids[id] = e
-
- if l, ok := c.listeners[id]; ok {
- l(e.w, e.err)
- }
-}
diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go
deleted file mode 100644
index 8f4409d1..00000000
--- a/plugins/temporal/workflow/message_queue.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package workflow
-
-import (
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-type messageQueue struct {
- seqID func() uint64
- queue []rrt.Message
-}
-
-func newMessageQueue(sedID func() uint64) *messageQueue {
- return &messageQueue{
- seqID: sedID,
- queue: make([]rrt.Message, 0, 5),
- }
-}
-
-func (mq *messageQueue) flush() {
- mq.queue = mq.queue[0:0]
-}
-
-func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) {
- msg := rrt.Message{
- ID: mq.seqID(),
- Command: cmd,
- Payloads: payloads,
- }
-
- return msg.ID, msg
-}
-
-func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 {
- id, msg := mq.allocateMessage(cmd, payloads)
- mq.queue = append(mq.queue, msg)
- return id
-}
-
-func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) {
- mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads})
-}
-
-func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) {
- mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure})
-}
diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go
deleted file mode 100644
index 1fcd409f..00000000
--- a/plugins/temporal/workflow/message_queue_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package workflow
-
-import (
- "sync/atomic"
- "testing"
-
- "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-func Test_MessageQueueFlushError(t *testing.T) {
- var index uint64
- mq := newMessageQueue(func() uint64 {
- return atomic.AddUint64(&index, 1)
- })
-
- mq.pushError(1, &failure.Failure{})
- assert.Len(t, mq.queue, 1)
-
- mq.flush()
- assert.Len(t, mq.queue, 0)
- assert.Equal(t, uint64(0), index)
-}
-
-func Test_MessageQueueFlushResponse(t *testing.T) {
- var index uint64
- mq := newMessageQueue(func() uint64 {
- return atomic.AddUint64(&index, 1)
- })
-
- mq.pushResponse(1, &common.Payloads{})
- assert.Len(t, mq.queue, 1)
-
- mq.flush()
- assert.Len(t, mq.queue, 0)
- assert.Equal(t, uint64(0), index)
-}
-
-func Test_MessageQueueCommandID(t *testing.T) {
- var index uint64
- mq := newMessageQueue(func() uint64 {
- return atomic.AddUint64(&index, 1)
- })
-
- n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{})
- assert.Equal(t, n, index)
- assert.Len(t, mq.queue, 1)
-
- mq.flush()
- assert.Len(t, mq.queue, 0)
-}
diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go
deleted file mode 100644
index 572d9a3b..00000000
--- a/plugins/temporal/workflow/plugin.go
+++ /dev/null
@@ -1,203 +0,0 @@
-package workflow
-
-import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/cenkalti/backoff/v4"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/client"
-)
-
-const (
- // PluginName defines public service name.
- PluginName = "workflows"
-
- // RRMode sets as RR_MODE env variable to let worker know about the mode to run.
- RRMode = "temporal/workflow"
-)
-
-// Plugin manages workflows and workers.
-type Plugin struct {
- temporal client.Temporal
- events events.Handler
- server server.Server
- log logger.Logger
- mu sync.Mutex
- reset chan struct{}
- pool workflowPool
- closing int64
-}
-
-// Init workflow plugin.
-func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
- p.temporal = temporal
- p.server = server
- p.events = events.NewEventsHandler()
- p.log = log
- p.reset = make(chan struct{}, 1)
-
- return nil
-}
-
-// Serve starts workflow service.
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("workflow_plugin_serve")
- errCh := make(chan error, 1)
-
- pool, err := p.startPool()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- p.pool = pool
-
- go func() {
- for {
- select {
- case <-p.reset:
- if atomic.LoadInt64(&p.closing) == 1 {
- return
- }
-
- err := p.replacePool()
- if err == nil {
- continue
- }
-
- bkoff := backoff.NewExponentialBackOff()
- bkoff.InitialInterval = time.Second
-
- err = backoff.Retry(p.replacePool, bkoff)
- if err != nil {
- errCh <- errors.E(op, err)
- }
- }
- }
- }()
-
- return errCh
-}
-
-// Stop workflow service.
-func (p *Plugin) Stop() error {
- const op = errors.Op("workflow_plugin_stop")
- atomic.StoreInt64(&p.closing, 1)
-
- pool := p.getPool()
- if pool != nil {
- p.pool = nil
- err := pool.Destroy(context.Background())
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- }
-
- return nil
-}
-
-// Name of the service.
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-// Workers returns list of available workflow workers.
-func (p *Plugin) Workers() []worker.BaseProcess {
- p.mu.Lock()
- defer p.mu.Unlock()
- return p.pool.Workers()
-}
-
-// WorkflowNames returns list of all available workflows.
-func (p *Plugin) WorkflowNames() []string {
- return p.pool.WorkflowNames()
-}
-
-// Reset resets underlying workflow pool with new copy.
-func (p *Plugin) Reset() error {
- p.reset <- struct{}{}
-
- return nil
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) poolListener(event interface{}) {
- if ev, ok := event.(PoolEvent); ok {
- if ev.Event == eventWorkerExit {
- if ev.Caused != nil {
- p.log.Error("Workflow pool error", "error", ev.Caused)
- }
- p.reset <- struct{}{}
- }
- }
-
- p.events.Push(event)
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) startPool() (workflowPool, error) {
- const op = errors.Op("workflow_plugin_start_pool")
- pool, err := newWorkflowPool(
- p.temporal.GetCodec().WithLogger(p.log),
- p.poolListener,
- p.server,
- )
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- err = pool.Start(context.Background(), p.temporal)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames())
-
- return pool, nil
-}
-
-func (p *Plugin) replacePool() error {
- p.mu.Lock()
- const op = errors.Op("workflow_plugin_replace_pool")
- defer p.mu.Unlock()
-
- if p.pool != nil {
- err := p.pool.Destroy(context.Background())
- p.pool = nil
- if err != nil {
- p.log.Error(
- "Unable to destroy expired workflow pool",
- "error",
- errors.E(op, err),
- )
- return errors.E(op, err)
- }
- }
-
- pool, err := p.startPool()
- if err != nil {
- p.log.Error("Replace workflow pool failed", "error", err)
- return errors.E(op, err)
- }
-
- p.pool = pool
- p.log.Debug("workflow pool successfully replaced")
-
- return nil
-}
-
-// getPool returns currently pool.
-func (p *Plugin) getPool() workflowPool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.pool
-}
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go
deleted file mode 100644
index 45e6885c..00000000
--- a/plugins/temporal/workflow/process.go
+++ /dev/null
@@ -1,436 +0,0 @@
-package workflow
-
-import (
- "strconv"
- "sync/atomic"
- "time"
-
- "github.com/spiral/errors"
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- commonpb "go.temporal.io/api/common/v1"
- bindings "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/workflow"
-)
-
-// wraps single workflow process
-type workflowProcess struct {
- codec rrt.Codec
- pool workflowPool
- env bindings.WorkflowEnvironment
- header *commonpb.Header
- mq *messageQueue
- ids *idRegistry
- seqID uint64
- runID string
- pipeline []rrt.Message
- callbacks []func() error
- canceller *canceller
- inLoop bool
-}
-
-// Execute workflow, bootstraps process.
-func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) {
- wf.env = env
- wf.header = header
- wf.seqID = 0
- wf.runID = env.WorkflowInfo().WorkflowExecution.RunID
- wf.canceller = &canceller{}
-
- // sequenceID shared for all worker workflows
- wf.mq = newMessageQueue(wf.pool.SeqID)
- wf.ids = newIDRegistry()
-
- env.RegisterCancelHandler(wf.handleCancel)
- env.RegisterSignalHandler(wf.handleSignal)
- env.RegisterQueryHandler(wf.handleQuery)
-
- var (
- lastCompletion = bindings.GetLastCompletionResult(env)
- lastCompletionOffset = 0
- )
-
- if lastCompletion != nil && len(lastCompletion.Payloads) != 0 {
- if input == nil {
- input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}}
- }
-
- input.Payloads = append(input.Payloads, lastCompletion.Payloads...)
- lastCompletionOffset = len(lastCompletion.Payloads)
- }
-
- _ = wf.mq.pushCommand(
- rrt.StartWorkflow{
- Info: env.WorkflowInfo(),
- LastCompletion: lastCompletionOffset,
- },
- input,
- )
-}
-
-// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server.
-func (wf *workflowProcess) OnWorkflowTaskStarted() {
- wf.inLoop = true
- defer func() { wf.inLoop = false }()
-
- var err error
- for _, callback := range wf.callbacks {
- err = callback()
- if err != nil {
- panic(err)
- }
- }
- wf.callbacks = nil
-
- if err := wf.flushQueue(); err != nil {
- panic(err)
- }
-
- for len(wf.pipeline) > 0 {
- msg := wf.pipeline[0]
- wf.pipeline = wf.pipeline[1:]
-
- if msg.IsCommand() {
- err = wf.handleMessage(msg)
- }
-
- if err != nil {
- panic(err)
- }
- }
-}
-
-// StackTrace renders workflow stack trace.
-func (wf *workflowProcess) StackTrace() string {
- result, err := wf.runCommand(
- rrt.GetStackTrace{
- RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
- },
- nil,
- )
-
- if err != nil {
- return err.Error()
- }
-
- var stacktrace string
- err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace)
- if err != nil {
- return err.Error()
- }
-
- return stacktrace
-}
-
-// Close the workflow.
-func (wf *workflowProcess) Close() {
- // TODO: properly handle errors
- // panic(err)
-
- _ = wf.mq.pushCommand(
- rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
- nil,
- )
-
- _, _ = wf.discardQueue()
-}
-
-// execution context.
-func (wf *workflowProcess) getContext() rrt.Context {
- return rrt.Context{
- TaskQueue: wf.env.WorkflowInfo().TaskQueueName,
- TickTime: wf.env.Now().Format(time.RFC3339),
- Replay: wf.env.IsReplaying(),
- }
-}
-
-// schedule cancel command
-func (wf *workflowProcess) handleCancel() {
- _ = wf.mq.pushCommand(
- rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
- nil,
- )
-}
-
-// schedule the signal processing
-func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) {
- _ = wf.mq.pushCommand(
- rrt.InvokeSignal{
- RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
- Name: name,
- },
- input,
- )
-}
-
-// Handle query in blocking mode.
-func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) {
- result, err := wf.runCommand(
- rrt.InvokeQuery{
- RunID: wf.runID,
- Name: queryType,
- },
- queryArgs,
- )
-
- if err != nil {
- return nil, err
- }
-
- if result.Failure != nil {
- return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter())
- }
-
- return result.Payloads, nil
-}
-
-// process incoming command
-func (wf *workflowProcess) handleMessage(msg rrt.Message) error {
- const op = errors.Op("handleMessage")
- var err error
-
- var (
- id = msg.ID
- cmd = msg.Command
- payloads = msg.Payloads
- )
-
- switch cmd := cmd.(type) {
- case *rrt.ExecuteActivity:
- params := cmd.ActivityParams(wf.env, payloads)
- activityID := wf.env.ExecuteActivity(params, wf.createCallback(id))
-
- wf.canceller.register(id, func() error {
- wf.env.RequestCancelActivity(activityID)
- return nil
- })
-
- case *rrt.ExecuteChildWorkflow:
- params := cmd.WorkflowParams(wf.env, payloads)
-
- // always use deterministic id
- if params.WorkflowID == "" {
- nextID := atomic.AddUint64(&wf.seqID, 1)
- params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID))
- }
-
- wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) {
- wf.ids.push(id, r, e)
- })
-
- wf.canceller.register(id, func() error {
- wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID)
- return nil
- })
-
- case *rrt.GetChildWorkflowExecution:
- wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) {
- cl := wf.createCallback(id)
-
- // TODO rewrite
- if err != nil {
- panic(err)
- }
-
- p, err := wf.env.GetDataConverter().ToPayloads(w)
- if err != nil {
- panic(err)
- }
-
- cl(p, err)
- })
-
- case *rrt.NewTimer:
- timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id))
- wf.canceller.register(id, func() error {
- if timerID != nil {
- wf.env.RequestCancelTimer(*timerID)
- }
- return nil
- })
-
- case *rrt.GetVersion:
- version := wf.env.GetVersion(
- cmd.ChangeID,
- workflow.Version(cmd.MinSupported),
- workflow.Version(cmd.MaxSupported),
- )
-
- result, err := wf.env.GetDataConverter().ToPayloads(version)
- if err != nil {
- return errors.E(op, err)
- }
-
- wf.mq.pushResponse(id, result)
- err = wf.flushQueue()
- if err != nil {
- panic(err)
- }
-
- case *rrt.SideEffect:
- wf.env.SideEffect(
- func() (*commonpb.Payloads, error) {
- return payloads, nil
- },
- wf.createContinuableCallback(id),
- )
-
- case *rrt.CompleteWorkflow:
- result, _ := wf.env.GetDataConverter().ToPayloads("completed")
- wf.mq.pushResponse(id, result)
-
- if msg.Failure == nil {
- wf.env.Complete(payloads, nil)
- } else {
- wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter()))
- }
-
- case *rrt.ContinueAsNew:
- result, _ := wf.env.GetDataConverter().ToPayloads("completed")
- wf.mq.pushResponse(id, result)
-
- wf.env.Complete(nil, &workflow.ContinueAsNewError{
- WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
- Input: payloads,
- Header: wf.header,
- TaskQueueName: cmd.Options.TaskQueueName,
- WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout,
- WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout,
- WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout,
- })
-
- case *rrt.SignalExternalWorkflow:
- wf.env.SignalExternalWorkflow(
- cmd.Namespace,
- cmd.WorkflowID,
- cmd.RunID,
- cmd.Signal,
- payloads,
- nil,
- cmd.ChildWorkflowOnly,
- wf.createCallback(id),
- )
-
- case *rrt.CancelExternalWorkflow:
- wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id))
-
- case *rrt.Cancel:
- err = wf.canceller.cancel(cmd.CommandIDs...)
- if err != nil {
- return errors.E(op, err)
- }
-
- result, _ := wf.env.GetDataConverter().ToPayloads("completed")
- wf.mq.pushResponse(id, result)
-
- err = wf.flushQueue()
- if err != nil {
- panic(err)
- }
-
- case *rrt.Panic:
- panic(errors.E(cmd.Message))
-
- default:
- panic("undefined command")
- }
-
- return nil
-}
-
-func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler {
- callback := func(result *commonpb.Payloads, err error) error {
- wf.canceller.discard(id)
-
- if err != nil {
- wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
- return nil
- }
-
- // fetch original payload
- wf.mq.pushResponse(id, result)
- return nil
- }
-
- return func(result *commonpb.Payloads, err error) {
- // timer cancel callback can happen inside the loop
- if wf.inLoop {
- err := callback(result, err)
- if err != nil {
- panic(err)
- }
-
- return
- }
-
- wf.callbacks = append(wf.callbacks, func() error {
- return callback(result, err)
- })
- }
-}
-
-// callback to be called inside the queue processing, adds new messages at the end of the queue
-func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler {
- callback := func(result *commonpb.Payloads, err error) {
- wf.canceller.discard(id)
-
- if err != nil {
- wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
- return
- }
-
- wf.mq.pushResponse(id, result)
- err = wf.flushQueue()
- if err != nil {
- panic(err)
- }
- }
-
- return func(result *commonpb.Payloads, err error) {
- callback(result, err)
- }
-}
-
-// Exchange messages between host and worker processes and add new commands to the queue.
-func (wf *workflowProcess) flushQueue() error {
- const op = errors.Op("flush queue")
- messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
- wf.mq.flush()
-
- if err != nil {
- return errors.E(op, err)
- }
-
- wf.pipeline = append(wf.pipeline, messages...)
-
- return nil
-}
-
-// Exchange messages between host and worker processes without adding new commands to the queue.
-func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) {
- const op = errors.Op("discard queue")
- messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
- wf.mq.flush()
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return messages, nil
-}
-
-// Run single command and return single result.
-func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) {
- const op = errors.Op("workflow_process_runcommand")
- _, msg := wf.mq.allocateMessage(cmd, payloads)
-
- result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg)
- if err != nil {
- return rrt.Message{}, errors.E(op, err)
- }
-
- if len(result) != 1 {
- return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response"))
- }
-
- return result[0], nil
-}
diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go
deleted file mode 100644
index b9ed46c8..00000000
--- a/plugins/temporal/workflow/workflow_pool.go
+++ /dev/null
@@ -1,190 +0,0 @@
-package workflow
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- rrWorker "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/client"
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- bindings "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/worker"
- "go.temporal.io/sdk/workflow"
-)
-
-const eventWorkerExit = 8390
-
-// RR_MODE env variable key
-const RR_MODE = "RR_MODE" //nolint
-
-// RR_CODEC env variable key
-const RR_CODEC = "RR_CODEC" //nolint
-
-type workflowPool interface {
- SeqID() uint64
- Exec(p payload.Payload) (payload.Payload, error)
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.BaseProcess
- WorkflowNames() []string
-}
-
-// PoolEvent triggered on workflow pool worker events.
-type PoolEvent struct {
- Event int
- Context interface{}
- Caused error
-}
-
-// workflowPoolImpl manages workflowProcess executions between worker restarts.
-type workflowPoolImpl struct {
- codec rrt.Codec
- seqID uint64
- workflows map[string]rrt.WorkflowInfo
- tWorkers []worker.Worker
- mu sync.Mutex
- worker rrWorker.SyncWorker
- active bool
-}
-
-// newWorkflowPool creates new workflow pool.
-func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) {
- const op = errors.Op("new_workflow_pool")
- w, err := factory.NewWorker(
- context.Background(),
- map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()},
- listener,
- )
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- go func() {
- err := w.Wait()
- listener(PoolEvent{Event: eventWorkerExit, Caused: err})
- }()
-
- return &workflowPoolImpl{codec: codec, worker: rrWorker.From(w)}, nil
-}
-
-// Start the pool in non blocking mode.
-func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("workflow_pool_start")
- pool.mu.Lock()
- pool.active = true
- pool.mu.Unlock()
-
- err := pool.initWorkers(ctx, temporal)
- if err != nil {
- return errors.E(op, err)
- }
-
- for i := 0; i < len(pool.tWorkers); i++ {
- err := pool.tWorkers[i].Start()
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- return nil
-}
-
-// Active.
-func (pool *workflowPoolImpl) Active() bool {
- return pool.active
-}
-
-// Destroy stops all temporal workers and application worker.
-func (pool *workflowPoolImpl) Destroy(ctx context.Context) error {
- pool.mu.Lock()
- defer pool.mu.Unlock()
- const op = errors.Op("workflow_pool_destroy")
-
- pool.active = false
- for i := 0; i < len(pool.tWorkers); i++ {
- pool.tWorkers[i].Stop()
- }
-
- worker.PurgeStickyWorkflowCache()
-
- err := pool.worker.Stop()
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-// NewWorkflowDefinition initiates new workflow process.
-func (pool *workflowPoolImpl) NewWorkflowDefinition() bindings.WorkflowDefinition {
- return &workflowProcess{
- codec: pool.codec,
- pool: pool,
- }
-}
-
-// NewWorkflowDefinition initiates new workflow process.
-func (pool *workflowPoolImpl) SeqID() uint64 {
- return atomic.AddUint64(&pool.seqID, 1)
-}
-
-// Exec set of commands in thread safe move.
-func (pool *workflowPoolImpl) Exec(p payload.Payload) (payload.Payload, error) {
- pool.mu.Lock()
- defer pool.mu.Unlock()
-
- if !pool.active {
- return payload.Payload{}, nil
- }
-
- return pool.worker.Exec(p)
-}
-
-func (pool *workflowPoolImpl) Workers() []rrWorker.BaseProcess {
- return []rrWorker.BaseProcess{pool.worker}
-}
-
-func (pool *workflowPoolImpl) WorkflowNames() []string {
- names := make([]string, 0, len(pool.workflows))
- for name := range pool.workflows {
- names = append(names, name)
- }
-
- return names
-}
-
-// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
-func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("workflow_pool_init_workers")
- workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter())
- if err != nil {
- return errors.E(op, err)
- }
-
- pool.workflows = make(map[string]rrt.WorkflowInfo)
- pool.tWorkers = make([]worker.Worker, 0)
-
- for _, info := range workerInfo {
- w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
- if err != nil {
- return errors.E(op, err, pool.Destroy(ctx))
- }
-
- pool.tWorkers = append(pool.tWorkers, w)
- for _, workflowInfo := range info.Workflows {
- w.RegisterWorkflowWithOptions(pool, workflow.RegisterOptions{
- Name: workflowInfo.Name,
- DisableAlreadyRegisteredCheck: false,
- })
-
- pool.workflows[workflowInfo.Name] = workflowInfo
- }
- }
-
- return nil
-}
diff --git a/tests/composer.json b/tests/composer.json
index 060f105e..0cf74581 100644
--- a/tests/composer.json
+++ b/tests/composer.json
@@ -6,10 +6,5 @@
"spiral/roadrunner": "^2.0",
"spiral/roadrunner-http": "^2.0",
"temporal/sdk": "dev-master"
- },
- "autoload": {
- "psr-4": {
- "Temporal\\Tests\\": "temporal"
- }
}
}
diff --git a/tests/plugins/temporal/.rr.yaml b/tests/plugins/temporal/.rr.yaml
deleted file mode 100644
index 04d0730d..00000000
--- a/tests/plugins/temporal/.rr.yaml
+++ /dev/null
@@ -1,22 +0,0 @@
-# Application configuration
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php worker.php"
-
-# Workflow and activity mesh service
-temporal:
- address: "localhost:7233"
- activities:
- num_workers: 4
- codec: json
- debug_level: 2
-
-logs:
- mode: none
- channels:
- activities:
- mode: none
- #workflows:
- #mode: none \ No newline at end of file
diff --git a/tests/plugins/temporal/cancel_test.go b/tests/plugins/temporal/cancel_test.go
deleted file mode 100644
index 0fd3c126..00000000
--- a/tests/plugins/temporal/cancel_test.go
+++ /dev/null
@@ -1,291 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/enums/v1"
- "go.temporal.io/api/history/v1"
- "go.temporal.io/sdk/client"
-)
-
-func Test_SimpleWorkflowCancel(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleSignalledWorkflow")
- assert.NoError(t, err)
-
- time.Sleep(time.Millisecond * 500)
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.Error(t, w.Get(context.Background(), &result))
-
- we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- assert.Equal(t, "Canceled", we.WorkflowExecutionInfo.Status.String())
-}
-
-func Test_CancellableWorkflowScope(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledScopeWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "yes", result)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_TIMER_CANCELED
- })
-
- s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
- })
-}
-
-func Test_CancelledWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "CANCELLED", result)
-}
-
-func Test_CancelledWithCompensationWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledWithCompensationWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "yield",
- "rollback",
- "captured retry",
- "captured promise on cancelled",
- "START rollback",
- "WAIT ROLLBACK",
- "RESULT (ROLLBACK)", "DONE rollback",
- "COMPLETE rollback",
- "result: OK",
- },
- trace,
- )
-}
-
-func Test_CancelledNestedWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledNestedWorkflow",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "CANCELLED", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "begin",
- "first scope",
- "second scope",
- "close second scope",
- "close first scope",
- "second scope cancelled",
- "first scope cancelled",
- "close process",
- },
- trace,
- )
-}
-
-func Test_CancelledNSingleScopeWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledSingleScopeWorkflow",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- err = s.Client().CancelWorkflow(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "start",
- "in scope",
- "on cancel",
- "captured in scope",
- "captured in process",
- },
- trace,
- )
-}
-
-func Test_CancelledMidflightWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelledMidflightWorkflow",
- )
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "start",
- "in scope",
- "on cancel",
- "done cancel",
- },
- trace,
- )
-
- s.AssertNotContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
- })
-}
-
-func Test_CancelSignalledChildWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "CancelSignalledChildWorkflow",
- )
- assert.NoError(t, err)
-
- var result interface{}
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "cancelled ok", result)
-
- e, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "getStatus")
- assert.NoError(t, err)
-
- trace := make([]string, 0)
- assert.NoError(t, e.Get(&trace))
- assert.Equal(
- t,
- []string{
- "start",
- "child started",
- "child signalled",
- "scope cancelled",
- "process done",
- },
- trace,
- )
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED
- })
-}
diff --git a/tests/plugins/temporal/child_test.go b/tests/plugins/temporal/child_test.go
deleted file mode 100644
index 49521791..00000000
--- a/tests/plugins/temporal/child_test.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/sdk/client"
-)
-
-func Test_ExecuteChildWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "WithChildWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "Child: CHILD HELLO WORLD", result)
-}
-
-func Test_ExecuteChildStubWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "WithChildStubWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "Child: CHILD HELLO WORLD", result)
-}
-
-func Test_ExecuteChildStubWorkflow_02(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ChildStubWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result []string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, []string{"HELLO WORLD", "UNTYPED"}, result)
-}
-
-func Test_SignalChildViaStubWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SignalChildViaStubWorkflow",
- )
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, 8, result)
-}
diff --git a/tests/plugins/temporal/disaster_test.go b/tests/plugins/temporal/disaster_test.go
deleted file mode 100644
index 9ca4d018..00000000
--- a/tests/plugins/temporal/disaster_test.go
+++ /dev/null
@@ -1,114 +0,0 @@
-package tests
-
-import (
- "context"
- "os"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/sdk/client"
-)
-
-func Test_WorkerError_DisasterRecovery(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
- assert.NoError(t, err)
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "TimerWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Millisecond * 750)
-
- // must fully recover with new worker
- assert.NoError(t, p.Kill())
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "hello world", result)
-}
-
-func Test_WorkerError_DisasterRecovery_Heavy(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- defer func() {
- // always restore script
- _ = os.Rename("worker.bak", "worker.php")
- }()
-
- // Makes worker pool unable to recover for some time
- _ = os.Rename("worker.php", "worker.bak")
-
- p, err := os.FindProcess(int(s.workflows.Workers()[0].Pid()))
- assert.NoError(t, err)
-
- // must fully recover with new worker
- assert.NoError(t, p.Kill())
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "TimerWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Millisecond * 750)
-
- // restore the script and recover activity pool
- _ = os.Rename("worker.bak", "worker.php")
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "hello world", result)
-}
-
-func Test_ActivityError_DisasterRecovery(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- defer func() {
- // always restore script
- _ = os.Rename("worker.bak", "worker.php")
- }()
-
- // Makes worker pool unable to recover for some time
- _ = os.Rename("worker.php", "worker.bak")
-
- // destroys all workers in activities
- for _, wrk := range s.activities.Workers() {
- assert.NoError(t, wrk.Kill())
- }
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- // activity can't complete at this moment
- time.Sleep(time.Millisecond * 750)
-
- // restore the script and recover activity pool
- _ = os.Rename("worker.bak", "worker.php")
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "HELLO WORLD", result)
-}
diff --git a/tests/plugins/temporal/hp_test.go b/tests/plugins/temporal/hp_test.go
deleted file mode 100644
index bceac025..00000000
--- a/tests/plugins/temporal/hp_test.go
+++ /dev/null
@@ -1,408 +0,0 @@
-package tests
-
-import (
- "context"
- "crypto/rand"
- "crypto/sha512"
- "fmt"
- "testing"
- "time"
-
- "go.temporal.io/api/common/v1"
-
- "github.com/fatih/color"
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/enums/v1"
- "go.temporal.io/api/history/v1"
- "go.temporal.io/sdk/client"
-)
-
-func init() {
- color.NoColor = false
-}
-
-func Test_VerifyRegistration(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- assert.Contains(t, s.workflows.WorkflowNames(), "SimpleWorkflow")
-
- assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.echo")
- assert.Contains(t, s.activities.ActivityNames(), "HeartBeatActivity.doSomething")
-
- assert.Contains(t, s.activities.ActivityNames(), "SimpleActivity.lower")
-}
-
-func Test_ExecuteSimpleWorkflow_1(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "HELLO WORLD", result)
-}
-
-type User struct {
- Name string
- Email string
-}
-
-func Test_ExecuteSimpleDTOWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleDTOWorkflow",
- User{
- Name: "Antony",
- Email: "[email protected]",
- },
- )
- assert.NoError(t, err)
-
- var result struct{ Message string }
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "Hello Antony <[email protected]>", result.Message)
-}
-
-func Test_ExecuteSimpleWorkflowWithSequenceInBatch(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "WorkflowWithSequence",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-}
-
-func Test_MultipleWorkflowsInSingleWorker(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- w2, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "TimerWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "HELLO WORLD", result)
-
- assert.NoError(t, w2.Get(context.Background(), &result))
- assert.Equal(t, "hello world", result)
-}
-
-func Test_ExecuteWorkflowWithParallelScopes(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ParallelScopesWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "HELLO WORLD|Hello World|hello world", result)
-}
-
-func Test_Timer(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- start := time.Now()
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "TimerWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "hello world", result)
- assert.True(t, time.Since(start).Seconds() > 1)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_TIMER_STARTED
- })
-}
-
-func Test_SideEffect(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SideEffectWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Contains(t, result, "hello world-")
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- return event.EventType == enums.EVENT_TYPE_MARKER_RECORDED
- })
-}
-
-func Test_EmptyWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "EmptyWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, 42, result)
-}
-
-func Test_PromiseChaining(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ChainedWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "result:hello world", result)
-}
-
-func Test_ActivityHeartbeat(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleHeartbeatWorkflow",
- 2,
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
- assert.Len(t, we.PendingActivities, 1)
-
- act := we.PendingActivities[0]
-
- assert.Equal(t, `{"value":2}`, string(act.HeartbeatDetails.Payloads[0].Data))
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK", result)
-}
-
-func Test_FailedActivityHeartbeat(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "FailedHeartbeatWorkflow",
- 8,
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
- assert.Len(t, we.PendingActivities, 1)
-
- act := we.PendingActivities[0]
-
- assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data))
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK!", result)
-}
-
-func Test_BinaryPayload(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- rnd := make([]byte, 2500)
-
- _, err := rand.Read(rnd)
- assert.NoError(t, err)
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "BinaryWorkflow",
- rnd,
- )
- assert.NoError(t, err)
-
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
-
- assert.Equal(t, fmt.Sprintf("%x", sha512.Sum512(rnd)), result)
-}
-
-func Test_ContinueAsNew(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ContinuableWorkflow",
- 1,
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID())
- assert.NoError(t, err)
-
- assert.Equal(t, "ContinuedAsNew", we.WorkflowExecutionInfo.Status.String())
-
- time.Sleep(time.Second)
-
- // the result of the final workflow
- var result string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "OK6", result)
-}
-
-func Test_ActivityStubWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ActivityStubWorkflow",
- "hello world",
- )
- assert.NoError(t, err)
-
- // the result of the final workflow
- var result []string
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, []string{
- "HELLO WORLD",
- "invalid method call",
- "UNTYPED",
- }, result)
-}
-
-func Test_ExecuteProtoWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "ProtoPayloadWorkflow",
- )
- assert.NoError(t, err)
-
- var result common.WorkflowExecution
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, "updated", result.RunId)
- assert.Equal(t, "workflow id", result.WorkflowId)
-}
-
-func Test_SagaWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SagaWorkflow",
- )
- assert.NoError(t, err)
-
- var result string
- assert.Error(t, w.Get(context.Background(), &result))
-}
diff --git a/tests/plugins/temporal/query_test.go b/tests/plugins/temporal/query_test.go
deleted file mode 100644
index 8b0caeee..00000000
--- a/tests/plugins/temporal/query_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/sdk/client"
-)
-
-func Test_ListQueries(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "QueryWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- time.Sleep(time.Millisecond * 500)
-
- v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "error", -1)
- assert.Nil(t, v)
- assert.Error(t, err)
-
- assert.Contains(t, err.Error(), "KnownQueryTypes=[get]")
-
- var r int
- assert.NoError(t, w.Get(context.Background(), &r))
- assert.Equal(t, 0, r)
-}
-
-func Test_GetQuery(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "QueryWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", 88)
- assert.NoError(t, err)
- time.Sleep(time.Millisecond * 500)
-
- v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "get", nil)
- assert.NoError(t, err)
-
- var r int
- assert.NoError(t, v.Get(&r))
- assert.Equal(t, 88, r)
-
- assert.NoError(t, w.Get(context.Background(), &r))
- assert.Equal(t, 88, r)
-}
diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go
deleted file mode 100644
index c8d815c3..00000000
--- a/tests/plugins/temporal/server_test.go
+++ /dev/null
@@ -1,198 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
-
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/informer"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/resetter"
- "github.com/spiral/roadrunner/v2/plugins/rpc"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/activity"
- rrClient "github.com/spiral/roadrunner/v2/plugins/temporal/client"
- "github.com/spiral/roadrunner/v2/plugins/temporal/workflow"
- "go.temporal.io/api/enums/v1"
- "go.temporal.io/api/history/v1"
- temporalClient "go.temporal.io/sdk/client"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
-)
-
-type TestServer struct {
- container endure.Container
- temporal rrClient.Temporal
- activities *activity.Plugin
- workflows *workflow.Plugin
-}
-
-type ConfigOption struct {
- Name string
- Value interface{}
-}
-
-func NewOption(name string, value interface{}) ConfigOption {
- return ConfigOption{Name: name, Value: value}
-}
-
-func NewTestServer(opt ...ConfigOption) *TestServer {
- e, err := endure.NewContainer(initLogger(), endure.RetryOnFail(false))
- if err != nil {
- panic(err)
- }
-
- t := &rrClient.Plugin{}
- a := &activity.Plugin{}
- w := &workflow.Plugin{}
-
- if err := e.Register(initConfig(opt...)); err != nil {
- panic(err)
- }
-
- if err := e.Register(&logger.ZapLogger{}); err != nil {
- panic(err)
- }
- if err := e.Register(&resetter.Plugin{}); err != nil {
- panic(err)
- }
- if err := e.Register(&informer.Plugin{}); err != nil {
- panic(err)
- }
- if err := e.Register(&server.Plugin{}); err != nil {
- panic(err)
- }
- if err := e.Register(&rpc.Plugin{}); err != nil {
- panic(err)
- }
-
- if err := e.Register(t); err != nil {
- panic(err)
- }
- if err := e.Register(a); err != nil {
- panic(err)
- }
- if err := e.Register(w); err != nil {
- panic(err)
- }
-
- if err := e.Init(); err != nil {
- panic(err)
- }
-
- errCh, err := e.Serve()
- if err != nil {
- panic(err)
- }
-
- go func() {
- err := <-errCh
- er := e.Stop()
- if er != nil {
- panic(err)
- }
- }()
-
- return &TestServer{container: e, temporal: t, activities: a, workflows: w}
-}
-
-func (s *TestServer) Client() temporalClient.Client {
- return s.temporal.GetClient()
-}
-
-func (s *TestServer) MustClose() {
- err := s.container.Stop()
- if err != nil {
- panic(err)
- }
-}
-
-func initConfig(opt ...ConfigOption) config.Configurer {
- cfg := &config.Viper{}
- cfg.Path = ".rr.yaml"
- cfg.Prefix = "rr"
-
- return cfg
-}
-
-func initLogger() *zap.Logger {
- cfg := zap.Config{
- Level: zap.NewAtomicLevelAt(zap.ErrorLevel),
- Encoding: "console",
- EncoderConfig: zapcore.EncoderConfig{
- MessageKey: "message",
- LevelKey: "level",
- TimeKey: "time",
- CallerKey: "caller",
- NameKey: "name",
- StacktraceKey: "stack",
- EncodeLevel: zapcore.CapitalLevelEncoder,
- EncodeTime: zapcore.ISO8601TimeEncoder,
- EncodeCaller: zapcore.ShortCallerEncoder,
- },
- OutputPaths: []string{"stderr"},
- ErrorOutputPaths: []string{"stderr"},
- }
-
- l, err := cfg.Build(zap.AddCaller())
- if err != nil {
- panic(err)
- }
-
- return l
-}
-
-func (s *TestServer) AssertContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) {
- i := s.Client().GetWorkflowHistory(
- context.Background(),
- w.GetID(),
- w.GetRunID(),
- false,
- enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
- )
-
- for {
- if !i.HasNext() {
- t.Error("no more events and no match found")
- break
- }
-
- e, err := i.Next()
- if err != nil {
- t.Error("unable to read history event")
- break
- }
-
- if assert(e) {
- break
- }
- }
-}
-
-func (s *TestServer) AssertNotContainsEvent(t *testing.T, w temporalClient.WorkflowRun, assert func(*history.HistoryEvent) bool) {
- i := s.Client().GetWorkflowHistory(
- context.Background(),
- w.GetID(),
- w.GetRunID(),
- false,
- enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
- )
-
- for {
- if !i.HasNext() {
- break
- }
-
- e, err := i.Next()
- if err != nil {
- t.Error("unable to read history event")
- break
- }
-
- if assert(e) {
- t.Error("found unexpected event")
- break
- }
- }
-}
diff --git a/tests/plugins/temporal/signal_test.go b/tests/plugins/temporal/signal_test.go
deleted file mode 100644
index 51826287..00000000
--- a/tests/plugins/temporal/signal_test.go
+++ /dev/null
@@ -1,170 +0,0 @@
-package tests
-
-import (
- "context"
- "testing"
- "time"
-
- "github.com/pborman/uuid"
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/enums/v1"
- "go.temporal.io/api/history/v1"
- "go.temporal.io/sdk/client"
-)
-
-func Test_SignalsWithoutSignals(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleSignalledWorkflow",
- "Hello World",
- )
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, 0, result)
-}
-
-func Test_SendSignalDuringTimer(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().SignalWithStartWorkflow(
- context.Background(),
- "signalled-"+uuid.New(),
- "add",
- 10,
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleSignalledWorkflow",
- )
- assert.NoError(t, err)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1)
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, 9, result)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
- attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
- return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
- }
-
- return false
- })
-}
-
-func Test_SendSignalBeforeCompletingWorkflow(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "SimpleSignalledWorkflowWithSleep",
- )
- assert.NoError(t, err)
-
- // should be around sleep(1) call
- time.Sleep(time.Second + time.Millisecond*200)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "add", -1)
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, -1, result)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
- attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
- return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
- }
-
- return false
- })
-}
-
-func Test_RuntimeSignal(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().SignalWithStartWorkflow(
- context.Background(),
- "signalled-"+uuid.New(),
- "add",
- -1,
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "RuntimeSignalWorkflow",
- )
- assert.NoError(t, err)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
- assert.Equal(t, -1, result)
-
- s.AssertContainsEvent(t, w, func(event *history.HistoryEvent) bool {
- if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
- attr := event.Attributes.(*history.HistoryEvent_WorkflowExecutionSignaledEventAttributes)
- return attr.WorkflowExecutionSignaledEventAttributes.SignalName == "add"
- }
-
- return false
- })
-}
-
-func Test_SignalSteps(t *testing.T) {
- s := NewTestServer()
- defer s.MustClose()
-
- w, err := s.Client().ExecuteWorkflow(
- context.Background(),
- client.StartWorkflowOptions{
- TaskQueue: "default",
- },
- "WorkflowWithSignalledSteps",
- )
- assert.NoError(t, err)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "begin", true)
- assert.NoError(t, err)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next1", true)
- assert.NoError(t, err)
-
- v, err := s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil)
- assert.NoError(t, err)
-
- var r int
- assert.NoError(t, v.Get(&r))
- assert.Equal(t, 2, r)
-
- err = s.Client().SignalWorkflow(context.Background(), w.GetID(), w.GetRunID(), "next2", true)
- assert.NoError(t, err)
-
- v, err = s.Client().QueryWorkflow(context.Background(), w.GetID(), w.GetRunID(), "value", nil)
- assert.NoError(t, err)
-
- assert.NoError(t, v.Get(&r))
- assert.Equal(t, 3, r)
-
- var result int
- assert.NoError(t, w.Get(context.Background(), &result))
-
- // 3 ticks
- assert.Equal(t, 3, result)
-}
diff --git a/tests/plugins/temporal/worker.php b/tests/plugins/temporal/worker.php
deleted file mode 100644
index 0d0263e7..00000000
--- a/tests/plugins/temporal/worker.php
+++ /dev/null
@@ -1,33 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-require __DIR__ . '/../../vendor/autoload.php';
-
-/**
- * @param string $dir
- * @return array<string>
- */
-$getClasses = static function (string $dir): iterable {
- $files = glob($dir . '/*.php');
-
- foreach ($files as $file) {
- yield substr(basename($file), 0, -4);
- }
-};
-
-$factory = \Temporal\WorkerFactory::create();
-
-$worker = $factory->newWorker('default');
-
-// register all workflows
-foreach ($getClasses(__DIR__ . '/../../temporal/Workflow') as $name) {
- $worker->registerWorkflowType('Temporal\\Tests\\Workflow\\' . $name);
-}
-
-// register all activity
-foreach ($getClasses(__DIR__ . '/../../temporal/Activity') as $name) {
- $worker->registerActivityType('Temporal\\Tests\\Activity\\' . $name);
-}
-
-$factory->run();
diff --git a/tests/temporal/Activity/HeartBeatActivity.php b/tests/temporal/Activity/HeartBeatActivity.php
deleted file mode 100644
index acf4a451..00000000
--- a/tests/temporal/Activity/HeartBeatActivity.php
+++ /dev/null
@@ -1,58 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Activity;
-
-use Temporal\Activity;
-use Temporal\Activity\ActivityInterface;
-use Temporal\Activity\ActivityMethod;
-use Temporal\Roadrunner\Internal\Error;
-
-#[ActivityInterface(prefix: "HeartBeatActivity.")]
-class HeartBeatActivity
-{
- #[ActivityMethod]
- public function doSomething(
- int $value
- ): string {
- Activity::heartbeat(['value' => $value]);
- sleep($value);
- return 'OK';
- }
-
- #[ActivityMethod]
- public function slow(
- string $value
- ): string {
- for ($i = 0; $i < 5; $i++) {
- Activity::heartbeat(['value' => $i]);
- sleep(1);
- }
-
- return 'OK';
- }
-
- #[ActivityMethod]
- public function something(
- string $value
- ): string {
- Activity::heartbeat(['value' => $value]);
- sleep($value);
- return 'OK';
- }
-
- #[ActivityMethod]
- public function failedActivity(
- int $value
- ): string {
- Activity::heartbeat(['value' => $value]);
- if (Activity::getInfo()->attempt === 1) {
- throw new \Error("failed");
- }
-
- if (!is_array(Activity::getHeartbeatDetails())) {
- throw new \Error("no heartbeat details");
- }
-
- return 'OK!';
- }
-} \ No newline at end of file
diff --git a/tests/temporal/Activity/SimpleActivity.php b/tests/temporal/Activity/SimpleActivity.php
deleted file mode 100644
index 576b126e..00000000
--- a/tests/temporal/Activity/SimpleActivity.php
+++ /dev/null
@@ -1,63 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Activity;
-
-use Temporal\Activity\ActivityInterface;
-use Temporal\Activity\ActivityMethod;
-use Temporal\Api\Common\V1\WorkflowExecution;
-use Temporal\DataConverter\Bytes;
-use Temporal\Tests\DTO\Message;
-use Temporal\Tests\DTO\User;
-
-#[ActivityInterface(prefix: "SimpleActivity.")]
-class SimpleActivity
-{
- #[ActivityMethod]
- public function echo(
- string $input
- ): string {
- return strtoupper($input);
- }
-
- #[ActivityMethod]
- public function lower(
- string $input
- ): string {
- return strtolower($input);
- }
-
- #[ActivityMethod]
- public function greet(
- User $user
- ): Message {
- return new Message(sprintf("Hello %s <%s>", $user->name, $user->email));
- }
-
- #[ActivityMethod]
- public function slow(
- string $input
- ): string {
- sleep(2);
-
- return strtolower($input);
- }
-
- #[ActivityMethod]
- public function sha512(
- Bytes $input
- ): string {
- return hash("sha512", ($input->getData()));
- }
-
- public function updateRunID(WorkflowExecution $e): WorkflowExecution
- {
- $e->setRunId('updated');
- return $e;
- }
-
- #[ActivityMethod]
- public function fail()
- {
- throw new \Error("failed activity");
- }
-} \ No newline at end of file
diff --git a/tests/temporal/Client/StartNewWorkflow.php b/tests/temporal/Client/StartNewWorkflow.php
deleted file mode 100644
index 67bc1d01..00000000
--- a/tests/temporal/Client/StartNewWorkflow.php
+++ /dev/null
@@ -1,23 +0,0 @@
-<?php
-
-
-namespace Temporal\Tests\Client;
-
-use Temporal\Client;
-use Temporal\Tests\Workflow\SimpleDTOWorkflow;
-
-use function Symfony\Component\String\s;
-
-class StartNewWorkflow
-{
- private $stub;
-
- public function __construct(Client\ClientInterface $client)
- {
- $this->stub = $client->newWorkflowStub(SimpleDTOWorkflow::class);
- }
-
- public function __invoke()
- {
- }
-}
diff --git a/tests/temporal/DTO/Message.php b/tests/temporal/DTO/Message.php
deleted file mode 100644
index 61703fe8..00000000
--- a/tests/temporal/DTO/Message.php
+++ /dev/null
@@ -1,14 +0,0 @@
-<?php
-
-
-namespace Temporal\Tests\DTO;
-
-class Message
-{
- public string $message;
-
- public function __construct(string $message)
- {
- $this->message = $message;
- }
-} \ No newline at end of file
diff --git a/tests/temporal/DTO/User.php b/tests/temporal/DTO/User.php
deleted file mode 100644
index cefea137..00000000
--- a/tests/temporal/DTO/User.php
+++ /dev/null
@@ -1,15 +0,0 @@
-<?php
-
-
-namespace Temporal\Tests\DTO;
-
-use Temporal\Internal\Marshaller\Meta\Marshal;
-
-class User
-{
- #[Marshal(name: "Name")]
- public string $name;
-
- #[Marshal(name: "Email")]
- public string $email;
-} \ No newline at end of file
diff --git a/tests/temporal/Workflow/ActivityStubWorkflow.php b/tests/temporal/Workflow/ActivityStubWorkflow.php
deleted file mode 100644
index 58dcdafb..00000000
--- a/tests/temporal/Workflow/ActivityStubWorkflow.php
+++ /dev/null
@@ -1,39 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class ActivityStubWorkflow
-{
- #[WorkflowMethod(name: 'ActivityStubWorkflow')]
- public function handler(
- string $input
- ) {
- // typed stub
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- $result = [];
- $result[] = yield $simple->echo($input);
-
- try {
- $simple->undefined($input);
- } catch (\BadMethodCallException $e) {
- $result[] = 'invalid method call';
- }
-
- // untyped stub
- $untyped = Workflow::newUntypedActivityStub(ActivityOptions::new()->withStartToCloseTimeout(1));
-
- $result[] = yield $untyped->execute('SimpleActivity.echo', ['untyped']);
-
- return $result;
- }
-}
diff --git a/tests/temporal/Workflow/AggregatedWorkflow.php b/tests/temporal/Workflow/AggregatedWorkflow.php
deleted file mode 100644
index 3299179e..00000000
--- a/tests/temporal/Workflow/AggregatedWorkflow.php
+++ /dev/null
@@ -1,30 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow;
-use Temporal\Workflow\SignalMethod;
-use Temporal\Workflow\WorkflowInterface;
-use Temporal\Workflow\WorkflowMethod;
-
-#[WorkflowInterface]
-class AggregatedWorkflow
-{
- private array $values = [];
-
- #[SignalMethod]
- public function addValue(
- string $value
- ) {
- $this->values[] = $value;
- }
-
- #[WorkflowMethod(name: 'AggregatedWorkflow')]
- public function run(
- int $count
- ) {
- yield Workflow::await(fn() => count($this->values) === $count);
-
- return $this->values;
- }
-}
diff --git a/tests/temporal/Workflow/AsyncActivityWorkflow.php b/tests/temporal/Workflow/AsyncActivityWorkflow.php
deleted file mode 100644
index 79e45dfb..00000000
--- a/tests/temporal/Workflow/AsyncActivityWorkflow.php
+++ /dev/null
@@ -1,28 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityCancellationType;
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class AsyncActivityWorkflow
-{
- #[WorkflowMethod(name: 'AsyncActivityWorkflow')]
- public function handler()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()
- ->withStartToCloseTimeout(20)
- ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED)
- );
-
- return yield $simple->external();
- }
-}
diff --git a/tests/temporal/Workflow/BinaryWorkflow.php b/tests/temporal/Workflow/BinaryWorkflow.php
deleted file mode 100644
index ed1952ad..00000000
--- a/tests/temporal/Workflow/BinaryWorkflow.php
+++ /dev/null
@@ -1,21 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\DataConverter\Bytes;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class BinaryWorkflow
-{
- #[WorkflowMethod(name: 'BinaryWorkflow')]
- public function handler(
- Bytes $input
- ): iterable {
- $opts = ActivityOptions::new()->withStartToCloseTimeout(5);
-
- return yield Workflow::executeActivity('SimpleActivity.sha512', [$input], $opts);
- }
-}
diff --git a/tests/temporal/Workflow/CancelSignalledChildWorkflow.php b/tests/temporal/Workflow/CancelSignalledChildWorkflow.php
deleted file mode 100644
index e2e43efa..00000000
--- a/tests/temporal/Workflow/CancelSignalledChildWorkflow.php
+++ /dev/null
@@ -1,57 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use React\Promise\Deferred;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class CancelSignalledChildWorkflow
-{
- private array $status = [];
-
- #[Workflow\QueryMethod(name: 'getStatus')]
- public function getStatus(): array
- {
- return $this->status;
- }
-
- #[WorkflowMethod(name: 'CancelSignalledChildWorkflow')]
- public function handler()
- {
- // typed stub
- $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class);
-
- $waitSignalled = new Deferred();
-
- $this->status[] = 'start';
-
- // start execution
- $scope = Workflow::newCancellationScope(
- function () use ($simple, $waitSignalled) {
- $call = $simple->handler();
- $this->status[] = 'child started';
-
- yield $simple->add(8);
- $this->status[] = 'child signalled';
- $waitSignalled->resolve();
-
- return yield $call;
- }
- );
-
- // only cancel scope when signal dispatched
- yield $waitSignalled;
- $scope->cancel();
- $this->status[] = 'scope cancelled';
-
- try {
- return yield $scope;
- } catch (\Throwable $e) {
- $this->status[] = 'process done';
-
- return 'cancelled ok';
- }
- }
-}
diff --git a/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php b/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php
deleted file mode 100644
index 6b463192..00000000
--- a/tests/temporal/Workflow/CanceledHeartbeatWorkflow.php
+++ /dev/null
@@ -1,29 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityCancellationType;
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\HeartBeatActivity;
-
-#[Workflow\WorkflowInterface]
-class CanceledHeartbeatWorkflow
-{
- #[WorkflowMethod(name: 'CanceledHeartbeatWorkflow')]
- public function handler(): iterable
- {
- $act = Workflow::newActivityStub(
- HeartBeatActivity::class,
- ActivityOptions::new()
- ->withStartToCloseTimeout(50)
- ->withCancellationType(ActivityCancellationType::WAIT_CANCELLATION_COMPLETED)
- ->withHeartbeatTimeout(1)
- );
-
- return yield $act->slow('test');
- }
-}
diff --git a/tests/temporal/Workflow/CancelledMidflightWorkflow.php b/tests/temporal/Workflow/CancelledMidflightWorkflow.php
deleted file mode 100644
index ea799ce1..00000000
--- a/tests/temporal/Workflow/CancelledMidflightWorkflow.php
+++ /dev/null
@@ -1,47 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class CancelledMidflightWorkflow
-{
- private array $status = [];
-
- #[Workflow\QueryMethod(name: 'getStatus')]
- public function getStatus(): array
- {
- return $this->status;
- }
-
- #[WorkflowMethod(name: 'CancelledMidflightWorkflow')]
- public function handler()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- $this->status[] = 'start';
-
- $scope = Workflow::newCancellationScope(
- function () use ($simple) {
- $this->status[] = 'in scope';
- $simple->slow('1');
- }
- )->onCancel(
- function () {
- $this->status[] = 'on cancel';
- }
- );
-
- $scope->cancel();
- $this->status[] = 'done cancel';
-
- return 'OK';
- }
-}
diff --git a/tests/temporal/Workflow/CancelledNestedWorkflow.php b/tests/temporal/Workflow/CancelledNestedWorkflow.php
deleted file mode 100644
index 0c82f761..00000000
--- a/tests/temporal/Workflow/CancelledNestedWorkflow.php
+++ /dev/null
@@ -1,72 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Exception\Failure\CanceledFailure;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class CancelledNestedWorkflow
-{
- private array $status = [];
-
- #[Workflow\QueryMethod(name: 'getStatus')]
- public function getStatus(): array
- {
- return $this->status;
- }
-
- #[WorkflowMethod(name: 'CancelledNestedWorkflow')]
- public function handler()
- {
- $this->status[] = 'begin';
- try {
- yield Workflow::newCancellationScope(
- function () {
- $this->status[] = 'first scope';
-
- $scope = Workflow::newCancellationScope(
- function () {
- $this->status[] = 'second scope';
-
- try {
- yield Workflow::timer(2);
- } catch (CanceledFailure $e) {
- $this->status[] = 'second scope cancelled';
- throw $e;
- }
-
- $this->status[] = 'second scope done';
- }
- )->onCancel(
- function () {
- $this->status[] = 'close second scope';
- }
- );
-
- try {
- yield Workflow::timer(1);
- } catch (CanceledFailure $e) {
- $this->status[] = 'first scope cancelled';
- throw $e;
- }
-
- $this->status[] = 'first scope done';
-
- yield $scope;
- }
- )->onCancel(
- function () {
- $this->status[] = 'close first scope';
- }
- );
- } catch (CanceledFailure $e) {
- $this->status[] = 'close process';
-
- return 'CANCELLED';
- }
-
- return 'OK';
- }
-}
diff --git a/tests/temporal/Workflow/CancelledScopeWorkflow.php b/tests/temporal/Workflow/CancelledScopeWorkflow.php
deleted file mode 100644
index 50e0992f..00000000
--- a/tests/temporal/Workflow/CancelledScopeWorkflow.php
+++ /dev/null
@@ -1,39 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class CancelledScopeWorkflow
-{
- #[WorkflowMethod(name: 'CancelledScopeWorkflow')]
- public function handler()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- $cancelled = 'not';
-
- $scope = Workflow::newCancellationScope(
- function () use ($simple) {
- yield Workflow::timer(2);
- yield $simple->slow('hello');
- }
- )->onCancel(
- function () use (&$cancelled) {
- $cancelled = 'yes';
- }
- );
-
- yield Workflow::timer(1);
- $scope->cancel();
-
- return $cancelled;
- }
-}
diff --git a/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php b/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php
deleted file mode 100644
index 5fe8d3d8..00000000
--- a/tests/temporal/Workflow/CancelledSingleScopeWorkflow.php
+++ /dev/null
@@ -1,55 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Exception\Failure\CanceledFailure;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class CancelledSingleScopeWorkflow
-{
- private array $status = [];
-
- #[Workflow\QueryMethod(name: 'getStatus')]
- public function getStatus(): array
- {
- return $this->status;
- }
-
- #[WorkflowMethod(name: 'CancelledSingleScopeWorkflow')]
- public function handler()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()
- ->withStartToCloseTimeout(5)
- );
-
- $this->status[] = 'start';
- try {
- yield Workflow::newCancellationScope(
- function () use ($simple) {
- try {
- $this->status[] = 'in scope';
- yield $simple->slow('1');
- } catch (CanceledFailure $e) {
- // after process is complete, do not use for business logic
- $this->status[] = 'captured in scope';
- throw $e;
- }
- }
- )->onCancel(
- function () {
- $this->status[] = 'on cancel';
- }
- );
- } catch (CanceledFailure $e) {
- $this->status[] = 'captured in process';
- }
-
- return 'OK';
- }
-}
diff --git a/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php b/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php
deleted file mode 100644
index 2074aac1..00000000
--- a/tests/temporal/Workflow/CancelledWithCompensationWorkflow.php
+++ /dev/null
@@ -1,79 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Exception\Failure\CanceledFailure;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class CancelledWithCompensationWorkflow
-{
- private array $status = [];
-
- #[Workflow\QueryMethod(name: 'getStatus')]
- public function getStatus(): array
- {
- return $this->status;
- }
-
- #[WorkflowMethod(name: 'CancelledWithCompensationWorkflow')]
- public function handler()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- // waits for 2 seconds
- $slow = $simple->slow('DOING SLOW ACTIVITY');
-
- try {
- $this->status[] = 'yield';
- $result = yield $slow;
- } catch (CanceledFailure $e) {
- $this->status[] = 'rollback';
-
- try {
- // must fail again
- $result = yield $slow;
- } catch (CanceledFailure $e) {
- $this->status[] = 'captured retry';
- }
-
- try {
- // fail since on cancelled context
- $result = yield $simple->echo('echo must fail');
- } catch (CanceledFailure $e) {
- $this->status[] = 'captured promise on cancelled';
- }
-
- $scope = Workflow::newDetachedCancellationScope(
- function () use ($simple) {
- $this->status[] = 'START rollback';
-
- $second = yield $simple->echo('rollback');
-
- $this->status[] = sprintf("RESULT (%s)", $second);
-
- if ($second !== 'ROLLBACK') {
- $this->status[] = 'FAIL rollback';
- return 'failed to compensate ' . $second;
- }
- $this->status[] = 'DONE rollback';
-
- return 'OK';
- }
- );
-
- $this->status[] = 'WAIT ROLLBACK';
- $result = yield $scope;
- $this->status[] = 'COMPLETE rollback';
- }
-
- $this->status[] = 'result: ' . $result;
- return $result;
- }
-}
diff --git a/tests/temporal/Workflow/CancelledWorkflow.php b/tests/temporal/Workflow/CancelledWorkflow.php
deleted file mode 100644
index be9f7542..00000000
--- a/tests/temporal/Workflow/CancelledWorkflow.php
+++ /dev/null
@@ -1,31 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Exception\Failure\CanceledFailure;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class CancelledWorkflow
-{
- #[WorkflowMethod(name: 'CancelledWorkflow')]
- public function handler()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- // waits for 2 seconds
- $slow = $simple->slow('DOING SLOW ACTIVITY');
-
- try {
- return yield $slow;
- } catch (CanceledFailure $e) {
- return "CANCELLED";
- }
- }
-}
diff --git a/tests/temporal/Workflow/ChainedWorkflow.php b/tests/temporal/Workflow/ChainedWorkflow.php
deleted file mode 100644
index ba9c8f96..00000000
--- a/tests/temporal/Workflow/ChainedWorkflow.php
+++ /dev/null
@@ -1,31 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class ChainedWorkflow
-{
- #[WorkflowMethod(name: 'ChainedWorkflow')]
- public function handler(string $input): iterable
- {
- $opts = ActivityOptions::new()->withStartToCloseTimeout(5);
-
- return yield Workflow::executeActivity(
- 'SimpleActivity.echo',
- [$input],
- $opts
- )->then(function ($result) use ($opts) {
- return Workflow::executeActivity(
- 'SimpleActivity.lower',
- ['Result:' . $result],
- $opts
- );
- });
- }
-}
diff --git a/tests/temporal/Workflow/ChildStubWorkflow.php b/tests/temporal/Workflow/ChildStubWorkflow.php
deleted file mode 100644
index 608962c2..00000000
--- a/tests/temporal/Workflow/ChildStubWorkflow.php
+++ /dev/null
@@ -1,30 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class ChildStubWorkflow
-{
- #[WorkflowMethod(name: 'ChildStubWorkflow')]
- public function handler(
- string $input
- ) {
- // typed stub
- $simple = Workflow::newChildWorkflowStub(SimpleWorkflow::class);
-
- $result = [];
- $result[] = yield $simple->handler($input);
-
- // untyped
- $untyped = Workflow::newUntypedChildWorkflowStub('SimpleWorkflow');
- $result[] = yield $untyped->execute(['untyped']);
-
- $execution = yield $untyped->getExecution();
- assert($execution instanceof Workflow\WorkflowExecution);
-
- return $result;
- }
-}
diff --git a/tests/temporal/Workflow/ComplexExceptionalWorkflow.php b/tests/temporal/Workflow/ComplexExceptionalWorkflow.php
deleted file mode 100644
index bf65ccb2..00000000
--- a/tests/temporal/Workflow/ComplexExceptionalWorkflow.php
+++ /dev/null
@@ -1,26 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Common\RetryOptions;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class ComplexExceptionalWorkflow
-{
- #[WorkflowMethod(name: 'ComplexExceptionalWorkflow')]
- public function handler()
- {
- $child = Workflow::newChildWorkflowStub(
- ExceptionalActivityWorkflow::class,
- Workflow\ChildWorkflowOptions::new()->withRetryOptions(
- (new RetryOptions())->withMaximumAttempts(1)
- )
- );
-
- return yield $child->handler();
- }
-}
diff --git a/tests/temporal/Workflow/ContinuableWorkflow.php b/tests/temporal/Workflow/ContinuableWorkflow.php
deleted file mode 100644
index 78411414..00000000
--- a/tests/temporal/Workflow/ContinuableWorkflow.php
+++ /dev/null
@@ -1,38 +0,0 @@
-<?php
-
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class ContinuableWorkflow
-{
- #[WorkflowMethod(name: 'ContinuableWorkflow')]
- public function handler(
- int $generation
- ) {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- if ($generation > 5) {
- // complete
- return "OK" . $generation;
- }
-
- if ($generation !== 1) {
- assert(!empty(Workflow::getInfo()->continuedExecutionRunId));
- }
-
- for ($i = 0; $i < $generation; $i++) {
- yield $simple->echo((string)$generation);
- }
-
- return Workflow::continueAsNew('ContinuableWorkflow', [++$generation]);
- }
-}
diff --git a/tests/temporal/Workflow/EmptyWorkflow.php b/tests/temporal/Workflow/EmptyWorkflow.php
deleted file mode 100644
index 57fb5e65..00000000
--- a/tests/temporal/Workflow/EmptyWorkflow.php
+++ /dev/null
@@ -1,16 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Workflow;
-
-#[Workflow\WorkflowInterface]
-class EmptyWorkflow
-{
- #[WorkflowMethod]
- public function handler()
- {
- return 42;
- }
-}
diff --git a/tests/temporal/Workflow/ExceptionalActivityWorkflow.php b/tests/temporal/Workflow/ExceptionalActivityWorkflow.php
deleted file mode 100644
index e0ed0005..00000000
--- a/tests/temporal/Workflow/ExceptionalActivityWorkflow.php
+++ /dev/null
@@ -1,25 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Common\RetryOptions;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class ExceptionalActivityWorkflow
-{
- #[WorkflowMethod(name: 'ExceptionalActivityWorkflow')]
- public function handler()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- ->withRetryOptions((new RetryOptions())->withMaximumAttempts(1))
- );
-
- return yield $simple->fail();
- }
-}
diff --git a/tests/temporal/Workflow/ExceptionalWorkflow.php b/tests/temporal/Workflow/ExceptionalWorkflow.php
deleted file mode 100644
index 9a3e907f..00000000
--- a/tests/temporal/Workflow/ExceptionalWorkflow.php
+++ /dev/null
@@ -1,18 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class ExceptionalWorkflow
-{
- #[WorkflowMethod(name: 'ExceptionalWorkflow')]
- public function handler()
- {
- throw new \RuntimeException("workflow error");
- }
-}
diff --git a/tests/temporal/Workflow/FailedHeartbeatWorkflow.php b/tests/temporal/Workflow/FailedHeartbeatWorkflow.php
deleted file mode 100644
index e857f100..00000000
--- a/tests/temporal/Workflow/FailedHeartbeatWorkflow.php
+++ /dev/null
@@ -1,30 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Common\RetryOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\HeartBeatActivity;
-
-#[Workflow\WorkflowInterface]
-class FailedHeartbeatWorkflow
-{
- #[WorkflowMethod(name: 'FailedHeartbeatWorkflow')]
- public function handler(
- int $iterations
- ): iterable {
- $act = Workflow::newActivityStub(
- HeartBeatActivity::class,
- ActivityOptions::new()
- ->withStartToCloseTimeout(50)
- // will fail on first attempt
- ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(2))
- );
-
- return yield $act->failedActivity($iterations);
- }
-}
diff --git a/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php b/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php
deleted file mode 100644
index c389fd78..00000000
--- a/tests/temporal/Workflow/LoopWithSignalCoroutinesWorkflow.php
+++ /dev/null
@@ -1,55 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\SignalMethod;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class LoopWithSignalCoroutinesWorkflow
-{
- private array $values = [];
- private array $result = [];
- private $simple;
-
- public function __construct()
- {
- $this->simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
- }
-
- #[SignalMethod]
- public function addValue(
- string $value
- ) {
- $value = yield $this->simple->prefix('in signal ', $value);
- $value = yield $this->simple->prefix('in signal 2 ', $value);
-
- $this->values[] = $value;
- }
-
- #[WorkflowMethod(name: 'LoopWithSignalCoroutinesWorkflow')]
- public function run(
- int $count
- ) {
- while (true) {
- yield Workflow::await(fn() => $this->values !== []);
- $value = array_shift($this->values);
-
- // uppercases
- $this->result[] = yield $this->simple->echo($value);
-
- if (count($this->result) === $count) {
- break;
- }
- }
-
- asort($this->result);
- return array_values($this->result);
- }
-}
diff --git a/tests/temporal/Workflow/LoopWorkflow.php b/tests/temporal/Workflow/LoopWorkflow.php
deleted file mode 100644
index 97d7a3aa..00000000
--- a/tests/temporal/Workflow/LoopWorkflow.php
+++ /dev/null
@@ -1,51 +0,0 @@
-<?php
-
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\SignalMethod;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class LoopWorkflow
-{
- private array $values = [];
- private array $result = [];
- private $simple;
-
- public function __construct()
- {
- $this->simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
- }
-
- #[SignalMethod]
- public function addValue(
- string $value
- ) {
- $this->values[] = $value;
- }
-
- #[WorkflowMethod(name: 'LoopWorkflow')]
- public function run(
- int $count
- ) {
- while (true) {
- yield Workflow::await(fn() => $this->values !== []);
- $value = array_shift($this->values);
-
- $this->result[] = yield $this->simple->echo($value);
-
- if (count($this->result) === $count) {
- break;
- }
- }
-
- return $this->result;
- }
-}
diff --git a/tests/temporal/Workflow/ParallelScopesWorkflow.php b/tests/temporal/Workflow/ParallelScopesWorkflow.php
deleted file mode 100644
index 8a2303f4..00000000
--- a/tests/temporal/Workflow/ParallelScopesWorkflow.php
+++ /dev/null
@@ -1,36 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Promise;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class ParallelScopesWorkflow
-{
- #[WorkflowMethod(name: 'ParallelScopesWorkflow')]
- public function handler(string $input)
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- $a = Workflow::newCancellationScope(function () use ($simple, $input) {
- return yield $simple->echo($input);
- });
-
- $b = Workflow::newCancellationScope(function () use ($simple, $input) {
- return yield $simple->lower($input);
- });
-
- [$ra, $rb] = yield Promise::all([$a, $b]);
-
- return sprintf('%s|%s|%s', $ra, $input, $rb);
- }
-}
diff --git a/tests/temporal/Workflow/PeriodicWorkflow.php b/tests/temporal/Workflow/PeriodicWorkflow.php
deleted file mode 100644
index 08f5f2fa..00000000
--- a/tests/temporal/Workflow/PeriodicWorkflow.php
+++ /dev/null
@@ -1,19 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class PeriodicWorkflow
-{
- #[WorkflowMethod(name: 'PeriodicWorkflow')]
- public function handler()
- {
- error_log("GOT SOMETHING" . print_r(Workflow::getLastCompletionResult(), true));
-
- // todo: get last completion result
- return 'OK';
- }
-}
diff --git a/tests/temporal/Workflow/ProtoPayloadWorkflow.php b/tests/temporal/Workflow/ProtoPayloadWorkflow.php
deleted file mode 100644
index 7adbed1e..00000000
--- a/tests/temporal/Workflow/ProtoPayloadWorkflow.php
+++ /dev/null
@@ -1,33 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Api\Common\V1\WorkflowExecution;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class ProtoPayloadWorkflow
-{
- #[WorkflowMethod(name: 'ProtoPayloadWorkflow')]
- public function handler(): iterable
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- $e = new WorkflowExecution();
- $e->setWorkflowId('workflow id');
- $e->setRunId('run id');
-
- /** @var WorkflowExecution $e2 */
- $e2 = yield $simple->updateRunID($e);
- assert($e2->getWorkflowId() === $e->getWorkflowId());
- assert($e2->getRunId() === 'updated');
-
- return $e2;
- }
-}
diff --git a/tests/temporal/Workflow/QueryWorkflow.php b/tests/temporal/Workflow/QueryWorkflow.php
deleted file mode 100644
index 96e41582..00000000
--- a/tests/temporal/Workflow/QueryWorkflow.php
+++ /dev/null
@@ -1,41 +0,0 @@
-<?php
-
-/**
- * This file is part of Temporal package.
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class QueryWorkflow
-{
- private int $counter = 0;
-
- #[Workflow\SignalMethod(name: "add")]
- public function add(
- int $value
- ) {
- $this->counter += $value;
- }
-
- #[Workflow\QueryMethod(name: "get")]
- public function get(): int
- {
- return $this->counter;
- }
-
- #[WorkflowMethod]
- public function handler()
- {
- // collect signals during one second
- yield Workflow::timer(1);
-
- return $this->counter;
- }
-}
diff --git a/tests/temporal/Workflow/RuntimeSignalWorkflow.php b/tests/temporal/Workflow/RuntimeSignalWorkflow.php
deleted file mode 100644
index f700af72..00000000
--- a/tests/temporal/Workflow/RuntimeSignalWorkflow.php
+++ /dev/null
@@ -1,31 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use React\Promise\Deferred;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class RuntimeSignalWorkflow
-{
- #[WorkflowMethod]
- public function handler()
- {
- $wait1 = new Deferred();
- $wait2 = new Deferred();
-
- $counter = 0;
-
- Workflow::registerSignal('add', function ($value) use (&$counter, $wait1, $wait2) {
- $counter += $value;
- $wait1->resolve($value);
- $wait2->resolve($value);
- });
-
- yield $wait1;
- yield $wait2;
-
- return $counter;
- }
-}
diff --git a/tests/temporal/Workflow/SagaWorkflow.php b/tests/temporal/Workflow/SagaWorkflow.php
deleted file mode 100644
index e47c0203..00000000
--- a/tests/temporal/Workflow/SagaWorkflow.php
+++ /dev/null
@@ -1,54 +0,0 @@
-<?php
-
-/**
- * This file is part of Temporal package.
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Common\RetryOptions;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Workflow;
-
-#[Workflow\WorkflowInterface]
-class SagaWorkflow
-{
- #[Workflow\WorkflowMethod(name: 'SagaWorkflow')]
- public function run()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()
- ->withStartToCloseTimeout(60)
- ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(1))
- );
-
- $saga = new Workflow\Saga();
- $saga->setParallelCompensation(true);
-
- try {
- yield $simple->echo('test');
- $saga->addCompensation(
- function () use ($simple) {
- yield $simple->echo('compensate echo');
- }
- );
-
- yield $simple->lower('TEST');
- $saga->addCompensation(
- function () use ($simple) {
- yield $simple->lower('COMPENSATE LOWER');
- }
- );
-
- yield $simple->fail();
- } catch (\Throwable $e) {
- yield $saga->compensate();
- throw $e;
- }
- }
-}
diff --git a/tests/temporal/Workflow/SideEffectWorkflow.php b/tests/temporal/Workflow/SideEffectWorkflow.php
deleted file mode 100644
index 95d396e4..00000000
--- a/tests/temporal/Workflow/SideEffectWorkflow.php
+++ /dev/null
@@ -1,30 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Common\Uuid;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class SideEffectWorkflow
-{
- #[WorkflowMethod(name: 'SideEffectWorkflow')]
- public function handler(string $input): iterable
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- $result = yield Workflow::sideEffect(
- function () use ($input) {
- return $input . '-42';
- }
- );
-
- return yield $simple->lower($result);
- }
-}
diff --git a/tests/temporal/Workflow/SignalChildViaStubWorkflow.php b/tests/temporal/Workflow/SignalChildViaStubWorkflow.php
deleted file mode 100644
index 828086fc..00000000
--- a/tests/temporal/Workflow/SignalChildViaStubWorkflow.php
+++ /dev/null
@@ -1,25 +0,0 @@
-<?php
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class SignalChildViaStubWorkflow
-{
- #[WorkflowMethod(name: 'SignalChildViaStubWorkflow')]
- public function handler()
- {
- // typed stub
- $simple = Workflow::newChildWorkflowStub(SimpleSignalledWorkflow::class);
-
- // start execution
- $call = $simple->handler();
-
- yield $simple->add(8);
-
- // expects 8
- return yield $call;
- }
-}
diff --git a/tests/temporal/Workflow/SimpleDTOWorkflow.php b/tests/temporal/Workflow/SimpleDTOWorkflow.php
deleted file mode 100644
index bd39a0a0..00000000
--- a/tests/temporal/Workflow/SimpleDTOWorkflow.php
+++ /dev/null
@@ -1,35 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-use Temporal\Tests\DTO\Message;
-use Temporal\Tests\DTO\User;
-
-#[Workflow\WorkflowInterface]
-class SimpleDTOWorkflow
-{
- #[WorkflowMethod(name: 'SimpleDTOWorkflow')]//, returnType: Message::class)]
- public function handler(
- User $user
- ) {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()
- ->withStartToCloseTimeout(5)
- );
-
- $value = yield $simple->greet($user);
-
- if (!$value instanceof Message) {
- return "FAIL";
- }
-
- return $value;
- }
-}
diff --git a/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php b/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php
deleted file mode 100644
index c9999cd1..00000000
--- a/tests/temporal/Workflow/SimpleHeartbeatWorkflow.php
+++ /dev/null
@@ -1,25 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\HeartBeatActivity;
-
-#[Workflow\WorkflowInterface]
-class SimpleHeartbeatWorkflow
-{
- #[WorkflowMethod(name: 'SimpleHeartbeatWorkflow')]
- public function handler(int $iterations): iterable
- {
- $act = Workflow::newActivityStub(
- HeartBeatActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(50)
- );
-
- return yield $act->doSomething($iterations);
- }
-}
diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflow.php b/tests/temporal/Workflow/SimpleSignalledWorkflow.php
deleted file mode 100644
index 0df25a65..00000000
--- a/tests/temporal/Workflow/SimpleSignalledWorkflow.php
+++ /dev/null
@@ -1,30 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class SimpleSignalledWorkflow
-{
- private $counter = 0;
-
- #[Workflow\SignalMethod(name: "add")]
- public function add(
- int $value
- ) {
- $this->counter += $value;
- }
-
- #[WorkflowMethod(name: 'SimpleSignalledWorkflow')]
- public function handler(): iterable
- {
- // collect signals during one second
- yield Workflow::timer(1);
-
- return $this->counter;
- }
-}
diff --git a/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php b/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php
deleted file mode 100644
index d10ba04a..00000000
--- a/tests/temporal/Workflow/SimpleSignalledWorkflowWithSleep.php
+++ /dev/null
@@ -1,34 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class SimpleSignalledWorkflowWithSleep
-{
- private $counter = 0;
-
- #[Workflow\SignalMethod(name: "add")]
- public function add(
- int $value
- ) {
- $this->counter += $value;
- }
-
- #[WorkflowMethod(name: 'SimpleSignalledWorkflowWithSleep')]
- public function handler(): iterable
- {
- // collect signals during one second
- yield Workflow::timer(1);
-
- if (!Workflow::isReplaying()) {
- sleep(1);
- }
-
- return $this->counter;
- }
-}
diff --git a/tests/temporal/Workflow/SimpleWorkflow.php b/tests/temporal/Workflow/SimpleWorkflow.php
deleted file mode 100644
index 36e12f69..00000000
--- a/tests/temporal/Workflow/SimpleWorkflow.php
+++ /dev/null
@@ -1,31 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Common\RetryOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class SimpleWorkflow
-{
- #[WorkflowMethod(name: 'SimpleWorkflow')]
- public function handler(
- string $input
- ): iterable {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()
- ->withStartToCloseTimeout(5)
- ->withRetryOptions(
- RetryOptions::new()->withMaximumAttempts(2)
- )
- );
-
- return yield $simple->echo($input);
- }
-}
diff --git a/tests/temporal/Workflow/TimerWorkflow.php b/tests/temporal/Workflow/TimerWorkflow.php
deleted file mode 100644
index ab60d6c9..00000000
--- a/tests/temporal/Workflow/TimerWorkflow.php
+++ /dev/null
@@ -1,27 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class TimerWorkflow
-{
- #[WorkflowMethod(name: 'TimerWorkflow')]
- public function handler(string $input): iterable
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- yield Workflow::timer(1);
-
- return yield $simple->lower($input);
- }
-}
diff --git a/tests/temporal/Workflow/WaitWorkflow.php b/tests/temporal/Workflow/WaitWorkflow.php
deleted file mode 100644
index 826952c1..00000000
--- a/tests/temporal/Workflow/WaitWorkflow.php
+++ /dev/null
@@ -1,33 +0,0 @@
-<?php
-
-
-namespace Temporal\Tests\Workflow;
-
-
-use Temporal\Workflow;
-use Temporal\Workflow\SignalMethod;
-use Temporal\Workflow\WorkflowInterface;
-use Temporal\Workflow\WorkflowMethod;
-
-#[WorkflowInterface]
-class WaitWorkflow
-{
- private bool $ready = false;
- private string $value;
-
- #[SignalMethod]
- public function unlock(
- string $value
- ) {
- $this->ready = true;
- $this->value = $value;
- }
-
- #[WorkflowMethod(name: 'WaitWorkflow')]
- public function run()
- {
- yield Workflow::await(fn() => $this->ready);
-
- return $this->value;
- }
-}
diff --git a/tests/temporal/Workflow/WithChildStubWorkflow.php b/tests/temporal/Workflow/WithChildStubWorkflow.php
deleted file mode 100644
index cdebe3d8..00000000
--- a/tests/temporal/Workflow/WithChildStubWorkflow.php
+++ /dev/null
@@ -1,20 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class WithChildStubWorkflow
-{
- #[WorkflowMethod(name: 'WithChildStubWorkflow')]
- public function handler(string $input): iterable
- {
- $child = Workflow::newChildWorkflowStub(SimpleWorkflow::class);
-
- return 'Child: ' . (yield $child->handler('child ' . $input));
- }
-}
diff --git a/tests/temporal/Workflow/WithChildWorkflow.php b/tests/temporal/Workflow/WithChildWorkflow.php
deleted file mode 100644
index aac0979b..00000000
--- a/tests/temporal/Workflow/WithChildWorkflow.php
+++ /dev/null
@@ -1,25 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-
-#[Workflow\WorkflowInterface]
-class WithChildWorkflow
-{
- #[WorkflowMethod(name: 'WithChildWorkflow')]
- public function handler(
- string $input
- ): iterable {
- $result = yield Workflow::executeChildWorkflow(
- 'SimpleWorkflow',
- ['child ' . $input],
- Workflow\ChildWorkflowOptions::new()
- );
-
- return 'Child: ' . $result;
- }
-}
diff --git a/tests/temporal/Workflow/WorkflowWithSequence.php b/tests/temporal/Workflow/WorkflowWithSequence.php
deleted file mode 100644
index 9e813a9c..00000000
--- a/tests/temporal/Workflow/WorkflowWithSequence.php
+++ /dev/null
@@ -1,30 +0,0 @@
-<?php
-
-
-namespace Temporal\Tests\Workflow;
-
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class WorkflowWithSequence
-{
- #[WorkflowMethod(name: 'WorkflowWithSequence')]
- public function handler()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- $a = $simple->echo('a');
- $b = $simple->echo('b');
-
- yield $a;
- yield $b;
-
- return 'OK';
- }
-}
diff --git a/tests/temporal/Workflow/WorkflowWithSignalledSteps.php b/tests/temporal/Workflow/WorkflowWithSignalledSteps.php
deleted file mode 100644
index 5f1af766..00000000
--- a/tests/temporal/Workflow/WorkflowWithSignalledSteps.php
+++ /dev/null
@@ -1,51 +0,0 @@
-<?php
-
-
-namespace Temporal\Tests\Workflow;
-
-use React\Promise\Deferred;
-use React\Promise\PromiseInterface;
-use Temporal\Activity\ActivityOptions;
-use Temporal\Workflow;
-use Temporal\Workflow\WorkflowMethod;
-use Temporal\Tests\Activity\SimpleActivity;
-
-#[Workflow\WorkflowInterface]
-class WorkflowWithSignalledSteps
-{
- #[WorkflowMethod(name: 'WorkflowWithSignalledSteps')]
- public function handler()
- {
- $simple = Workflow::newActivityStub(
- SimpleActivity::class,
- ActivityOptions::new()->withStartToCloseTimeout(5)
- );
-
- $value = 0;
- Workflow::registerQuery('value', function () use (&$value) {
- return $value;
- });
-
- yield $this->promiseSignal('begin');
- $value++;
-
- yield $this->promiseSignal('next1');
- $value++;
-
- yield $this->promiseSignal('next2');
- $value++;
-
- return $value;
- }
-
- // is this correct?
- private function promiseSignal(string $name): PromiseInterface
- {
- $signal = new Deferred();
- Workflow::registerSignal($name, function ($value) use ($signal) {
- $signal->resolve($value);
- });
-
- return $signal->promise();
- }
-}