summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-10 20:00:22 +0300
committerValery Piashchynski <[email protected]>2021-02-10 20:00:22 +0300
commitae3dd0c3672217be0b3fb4042ef650477fba108b (patch)
treeda5b08308e5aff50a102f41e254ee3620d41550e
parentda64d9fbab7d73e203e7dbbb9503f4d422feaab0 (diff)
Rewrite container for the workers
Update tests
-rwxr-xr-xMakefile2
-rwxr-xr-xpkg/pool/static_pool.go5
-rwxr-xr-xpkg/pool/static_pool_test.go3
-rwxr-xr-xpkg/worker/state.go5
-rw-r--r--pkg/worker_watcher/container/interface.go9
-rw-r--r--pkg/worker_watcher/container/stack.go (renamed from pkg/worker_watcher/stack.go)16
-rw-r--r--pkg/worker_watcher/container/stack_test.go (renamed from pkg/worker_watcher/stack_test.go)2
-rw-r--r--pkg/worker_watcher/container/vec.go45
-rw-r--r--pkg/worker_watcher/interface.go10
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go240
-rw-r--r--plugins/http/config/http.go2
-rw-r--r--plugins/server/plugin.go12
-rw-r--r--tests/composer.json5
-rw-r--r--tests/plugins/server/server_plugin_test.go2
-rw-r--r--tests/psr-worker-bench.php2
-rw-r--r--tests/src/Activity/SimpleActivity.php63
-rw-r--r--tests/src/Client/StartNewWorkflow.php23
-rw-r--r--tests/src/Workflow/SagaWorkflow.php54
18 files changed, 382 insertions, 118 deletions
diff --git a/Makefile b/Makefile
index 9182f840..5e5f1cd7 100755
--- a/Makefile
+++ b/Makefile
@@ -24,7 +24,7 @@ uninstall: ## Uninstall locally installed RR
rm -f /usr/local/bin/rr
test_coverage:
- docker-compose -f tests/docker-compose.yaml up -d
+ docker-compose -f tests/docker-compose.yaml up -d --remove-orphans
rm -rf coverage
mkdir coverage
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipe.out -covermode=atomic ./pkg/transport/pipe
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index bb68151f..f1b20bb9 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -130,7 +130,8 @@ func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
}
func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
- return sp.ww.Remove(wb)
+ sp.ww.Remove(wb)
+ return nil
}
// Be careful, sync Exec with ExecWithContext
@@ -208,6 +209,8 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error {
const op = errors.Op("static_pool_check_max_jobs")
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ w.State().Set(worker.StateDestroyed)
+ sp.ww.Remove(w)
err := sp.ww.Allocate()
if err != nil {
return errors.E(op, err)
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 4c1c90e5..44f5936c 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -148,8 +148,6 @@ func Test_StaticPool_JobError(t *testing.T) {
cfg,
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
res, err := p.Exec(payload.Payload{Body: []byte("hello")})
@@ -163,6 +161,7 @@ func Test_StaticPool_JobError(t *testing.T) {
}
assert.Contains(t, err.Error(), "hello")
+ p.Destroy(ctx)
}
func Test_StaticPool_Broken_Replace(t *testing.T) {
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
index c5d70a21..176e151b 100755
--- a/pkg/worker/state.go
+++ b/pkg/worker/state.go
@@ -32,9 +32,6 @@ const (
// StateErrored - error StateImpl (can't be used).
StateErrored
-
- // StateRemove - worker is killed and removed from the stack
- StateRemove
)
type StateImpl struct {
@@ -70,8 +67,6 @@ func (s *StateImpl) String() string {
return "errored"
case StateDestroyed:
return "destroyed"
- case StateRemove:
- return "remove"
}
return "undefined"
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
new file mode 100644
index 00000000..bb66897f
--- /dev/null
+++ b/pkg/worker_watcher/container/interface.go
@@ -0,0 +1,9 @@
+package container
+
+import "github.com/spiral/roadrunner/v2/pkg/worker"
+
+type Vector interface {
+ Enqueue(worker.BaseProcess)
+ Dequeue() (worker.BaseProcess, bool)
+ Destroy()
+}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/container/stack.go
index 69e2024b..91c71b24 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/container/stack.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package container //nolint:golint,stylecheck
import (
"context"
"runtime"
@@ -32,8 +32,8 @@ func (stack *Stack) Reset() {
stack.workers = nil
}
-// Push worker back to the stack
-// If stack in destroy state, Push will provide 100ms window to unlock the mutex
+// Push worker back to the vec
+// If vec in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
stack.Lock()
defer stack.Unlock()
@@ -51,7 +51,7 @@ func (stack *Stack) Pop() (worker.BaseProcess, bool) {
stack.Lock()
defer stack.Unlock()
- // do not release new stack
+ // do not release new vec
if stack.destroy {
return nil, true
}
@@ -71,7 +71,7 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
stack.Lock()
defer stack.Unlock()
for i := 0; i < len(stack.workers); i++ {
- // worker in the stack, reallocating
+ // worker in the vec, reallocating
if stack.workers[i].Pid() == pid {
stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
stack.actualNumOfWorkers--
@@ -83,7 +83,7 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
return false
}
-// Workers return copy of the workers in the stack
+// Workers return copy of the workers in the vec
func (stack *Stack) Workers() []worker.BaseProcess {
stack.Lock()
defer stack.Unlock()
@@ -124,11 +124,11 @@ func (stack *Stack) Destroy(_ context.Context) {
}
stack.Unlock()
// unnecessary mutex, but
- // just to make sure. All stack at this moment are in the stack
+ // just to make sure. All vec at this moment are in the vec
// Pop operation is blocked, push can't be done, since it's not possible to pop
stack.Lock()
for i := 0; i < len(stack.workers); i++ {
- // set state for the stack in the stack (unused at the moment)
+ // set state for the vec in the vec (unused at the moment)
stack.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/container/stack_test.go
index 769419e4..59c15773 100644
--- a/pkg/worker_watcher/stack_test.go
+++ b/pkg/worker_watcher/container/stack_test.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package container //nolint:golint,stylecheck
import (
"context"
"os/exec"
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
new file mode 100644
index 00000000..da9822ef
--- /dev/null
+++ b/pkg/worker_watcher/container/vec.go
@@ -0,0 +1,45 @@
+package container //nolint:golint,stylecheck
+
+import (
+ "sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Vec struct {
+ destroy uint64
+ workers chan worker.BaseProcess
+}
+
+func NewVector(initialNumOfWorkers uint64) Vector {
+ vec := &Vec{
+ destroy: 0,
+ workers: make(chan worker.BaseProcess, initialNumOfWorkers),
+ }
+
+ return vec
+}
+
+func (v *Vec) Enqueue(w worker.BaseProcess) {
+ v.workers <- w
+}
+
+func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
+ /*
+ if *addr == old {
+ *addr = new
+ return true
+ }
+ */
+ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ return nil, true
+ }
+
+ w := <-v.workers
+
+ return w, false
+}
+
+func (v *Vec) Destroy() {
+ atomic.StoreUint64(&v.destroy, 1)
+}
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index a3552e7e..4625b7a7 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -8,7 +8,7 @@ import (
// Watcher is an interface for the Sync workers lifecycle
type Watcher interface {
- // Watch used to add workers to the stack
+ // Watch used to add workers to the container
Watch(workers []worker.BaseProcess) error
// Get provide first free worker
@@ -20,12 +20,12 @@ type Watcher interface {
// Allocate - allocates new worker and put it into the WorkerWatcher
Allocate() error
- // Destroy destroys the underlying stack
+ // Destroy destroys the underlying container
Destroy(ctx context.Context)
- // WorkersList return all stack w/o removing it from internal storage
+ // WorkersList return all container w/o removing it from internal storage
List() []worker.BaseProcess
- // RemoveWorker remove worker from the stack
- Remove(wb worker.BaseProcess) error
+ // RemoveWorker remove worker from the container
+ Remove(wb worker.BaseProcess)
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 2380c190..3e0633a3 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -3,33 +3,42 @@ package worker_watcher //nolint:golint,stylecheck
import (
"context"
"sync"
+ "time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
-// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
+// workerCreateFunc can be nil, but in that case, dead container will not be replaced
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
- stack: NewWorkersStack(numWorkers),
- allocator: allocator,
- events: events,
+ container: container.NewVector(numWorkers),
+ numWorkers: numWorkers,
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+ allocator: allocator,
+ events: events,
}
return ww
}
type workerWatcher struct {
- mutex sync.RWMutex
- stack *Stack
- allocator worker.Allocator
- events events.Handler
+ sync.RWMutex
+ container container.Vector
+ // used to control the Destroy stage (that all workers are in the container)
+ numWorkers uint64
+ workers []worker.BaseProcess
+ allocator worker.Allocator
+ events events.Handler
}
func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- ww.stack.Push(workers[i])
+ ww.container.Enqueue(workers[i])
+ // add worker to watch slice
+ ww.workers = append(ww.workers, workers[i])
go func(swc worker.BaseProcess) {
ww.wait(swc)
@@ -38,75 +47,96 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
return nil
}
+// return value from Get
+type get struct {
+ w worker.BaseProcess
+ err error
+}
+
// Get is not a thread safe operation
func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
+ c := make(chan get, 1)
const op = errors.Op("worker_watcher_get_free_worker")
- // FAST PATH
- // thread safe operation
- w, stop := ww.stack.Pop()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
- }
-
- // fast path, worker not nil and in the ReadyState
- if w != nil && w.State().Value() == worker.StateReady {
- return w, nil
- }
- // =========================================================
- // SLOW PATH
- // Put worker back (no matter it's state, it will be killed next)
- if w != nil {
- ww.stack.Push(w)
- }
- // no free workers in the stack
- // try to continuously get free one
- for {
- select {
- default:
- w, stop = ww.stack.Pop()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
+ go func() {
+ // FAST PATH
+ // thread safe operation
+ w, stop := ww.container.Dequeue()
+ if stop {
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
}
- if w == nil {
- continue
+ return
+ }
+
+ // fast path, worker not nil and in the ReadyState
+ if w.State().Value() == worker.StateReady {
+ c <- get{
+ w,
+ nil,
}
+ return
+ }
+ // =========================================================
+ // SLOW PATH
+ _ = w.Kill()
+ // no free workers in the container
+ // try to continuously get free one
+ for {
+ select {
+ default:
+ w, stop = ww.container.Dequeue()
+ if stop {
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
+ }
+ }
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- return w, nil
- case worker.StateRemove:
- err := ww.Remove(w)
- if err != nil {
- return nil, errors.E(op, err)
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ c <- get{
+ w,
+ nil,
+ }
+ return
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w)
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
}
- // try to get next
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateWorking, // ??? how
- worker.StateStopping:
- // worker doing no work because it in the stack
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
- continue
}
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
+ }()
+
+ select {
+ case r := <-c:
+ if r.err != nil {
+ return nil, r.err
+ }
+ return r.w, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed"))
}
}
func (ww *workerWatcher) Allocate() error {
- ww.mutex.Lock()
+ ww.Lock()
const op = errors.Op("worker_watcher_allocate_new")
sw, err := ww.allocator()
if err != nil {
@@ -114,47 +144,83 @@ func (ww *workerWatcher) Allocate() error {
}
ww.addToWatch(sw)
- ww.mutex.Unlock()
- ww.Push(sw)
+ ww.workers = append(ww.workers, sw)
+
+ ww.Unlock()
+ ww.Push(sw)
return nil
}
// Remove
-func (ww *workerWatcher) Remove(wb worker.BaseProcess) error {
- ww.mutex.Lock()
- defer ww.mutex.Unlock()
+func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
+ ww.Lock()
+ defer ww.Unlock()
- const op = errors.Op("worker_watcher_remove_worker")
// set remove state
- wb.State().Set(worker.StateRemove)
- if ww.stack.FindAndRemoveByPid(wb.Pid()) {
- err := wb.Kill()
- if err != nil {
- return errors.E(op, err)
+ pid := wb.Pid()
+
+ // worker will be removed on the Get operation
+ for i := 0; i < len(ww.workers); i++ {
+ if ww.workers[i].Pid() == pid {
+ ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
+ // kill worker
+ _ = wb.Kill()
+ return
}
- return nil
}
-
- return nil
}
// O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
- ww.mutex.Lock()
- defer ww.mutex.Unlock()
- ww.stack.Push(w)
+ ww.container.Enqueue(w)
}
-// Destroy all underlying stack (but let them to complete the task)
+// Destroy all underlying container (but let them to complete the task)
func (ww *workerWatcher) Destroy(ctx context.Context) {
- // destroy stack, we don't use ww mutex here, since we should be able to push worker
- ww.stack.Destroy(ctx)
+ // destroy container, we don't use ww mutex here, since we should be able to push worker
+ ww.Lock()
+ // do not release new workers
+ ww.container.Destroy()
+ ww.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 500)
+ defer tt.Stop()
+ for {
+ select {
+ case <-tt.C:
+ ww.Lock()
+ // that might be one of the workers is working
+ if ww.numWorkers != uint64(len(ww.workers)) {
+ ww.Unlock()
+ continue
+ }
+ ww.Unlock()
+ // unnecessary mutex, but
+ // just to make sure. All container at this moment are in the container
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ ww.Lock()
+ for i := 0; i < len(ww.workers); i++ {
+ ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = ww.workers[i].Kill()
+ }
+ return
+ }
+ }
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) List() []worker.BaseProcess {
- return ww.stack.Workers()
+ ww.Lock()
+ defer ww.Unlock()
+
+ base := make([]worker.BaseProcess, 0, len(ww.workers))
+ for i := 0; i < len(ww.workers); i++ {
+ base = append(base, ww.workers[i])
+ }
+
+ return base
}
func (ww *workerWatcher) wait(w worker.BaseProcess) {
@@ -174,7 +240,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
return
}
- _ = ww.stack.FindAndRemoveByPid(w.Pid())
+ ww.Remove(w)
err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{
diff --git a/plugins/http/config/http.go b/plugins/http/config/http.go
index bfbc1af6..022476e2 100644
--- a/plugins/http/config/http.go
+++ b/plugins/http/config/http.go
@@ -73,7 +73,7 @@ func (c *HTTP) InitDefaults() error {
c.Pool = &poolImpl.Config{
Debug: false,
NumWorkers: uint64(runtime.NumCPU()),
- MaxJobs: 1000,
+ MaxJobs: 0,
AllocateTimeout: time.Second * 60,
DestroyTimeout: time.Second * 60,
Supervisor: nil,
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 3fc77926..95e593b8 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -49,11 +49,6 @@ func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
server.cfg.InitDefaults()
server.log = log
- server.factory, err = server.initFactory()
- if err != nil {
- return errors.E(op, err)
- }
-
return nil
}
@@ -64,7 +59,14 @@ func (server *Plugin) Name() string {
// Serve (Start) server plugin (just a mock here to satisfy interface)
func (server *Plugin) Serve() chan error {
+ const op = errors.Op("server_plugin_serve")
errCh := make(chan error, 1)
+ var err error
+ server.factory, err = server.initFactory()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
return errCh
}
diff --git a/tests/composer.json b/tests/composer.json
index 52fa3a0e..cff9bd6b 100644
--- a/tests/composer.json
+++ b/tests/composer.json
@@ -7,5 +7,10 @@
"spiral/roadrunner-http": "^2.0",
"temporal/sdk": ">=1.0",
"spiral/tokenizer": ">=2.7"
+ },
+ "autoload": {
+ "psr-4": {
+ "Temporal\\Tests\\": "src"
+ }
}
}
diff --git a/tests/plugins/server/server_plugin_test.go b/tests/plugins/server/server_plugin_test.go
index f600832a..c0c3c993 100644
--- a/tests/plugins/server/server_plugin_test.go
+++ b/tests/plugins/server/server_plugin_test.go
@@ -278,7 +278,7 @@ func TestAppWrongRelay(t *testing.T) {
}
err = container.Init()
- assert.Error(t, err)
+ assert.NoError(t, err)
_, err = container.Serve()
assert.Error(t, err)
diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php
index 3f634443..d0c72eae 100644
--- a/tests/psr-worker-bench.php
+++ b/tests/psr-worker-bench.php
@@ -49,7 +49,7 @@ if ($env->getMode() === 'http') {
$worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name);
}
- // register all activity
+ // register all activity
foreach ($getClasses(__DIR__ . '/src/Activity') as $name) {
$class = 'Temporal\\Tests\\Activity\\' . $name;
$worker->registerActivityImplementations(new $class);
diff --git a/tests/src/Activity/SimpleActivity.php b/tests/src/Activity/SimpleActivity.php
new file mode 100644
index 00000000..576b126e
--- /dev/null
+++ b/tests/src/Activity/SimpleActivity.php
@@ -0,0 +1,63 @@
+<?php
+
+namespace Temporal\Tests\Activity;
+
+use Temporal\Activity\ActivityInterface;
+use Temporal\Activity\ActivityMethod;
+use Temporal\Api\Common\V1\WorkflowExecution;
+use Temporal\DataConverter\Bytes;
+use Temporal\Tests\DTO\Message;
+use Temporal\Tests\DTO\User;
+
+#[ActivityInterface(prefix: "SimpleActivity.")]
+class SimpleActivity
+{
+ #[ActivityMethod]
+ public function echo(
+ string $input
+ ): string {
+ return strtoupper($input);
+ }
+
+ #[ActivityMethod]
+ public function lower(
+ string $input
+ ): string {
+ return strtolower($input);
+ }
+
+ #[ActivityMethod]
+ public function greet(
+ User $user
+ ): Message {
+ return new Message(sprintf("Hello %s <%s>", $user->name, $user->email));
+ }
+
+ #[ActivityMethod]
+ public function slow(
+ string $input
+ ): string {
+ sleep(2);
+
+ return strtolower($input);
+ }
+
+ #[ActivityMethod]
+ public function sha512(
+ Bytes $input
+ ): string {
+ return hash("sha512", ($input->getData()));
+ }
+
+ public function updateRunID(WorkflowExecution $e): WorkflowExecution
+ {
+ $e->setRunId('updated');
+ return $e;
+ }
+
+ #[ActivityMethod]
+ public function fail()
+ {
+ throw new \Error("failed activity");
+ }
+} \ No newline at end of file
diff --git a/tests/src/Client/StartNewWorkflow.php b/tests/src/Client/StartNewWorkflow.php
new file mode 100644
index 00000000..67bc1d01
--- /dev/null
+++ b/tests/src/Client/StartNewWorkflow.php
@@ -0,0 +1,23 @@
+<?php
+
+
+namespace Temporal\Tests\Client;
+
+use Temporal\Client;
+use Temporal\Tests\Workflow\SimpleDTOWorkflow;
+
+use function Symfony\Component\String\s;
+
+class StartNewWorkflow
+{
+ private $stub;
+
+ public function __construct(Client\ClientInterface $client)
+ {
+ $this->stub = $client->newWorkflowStub(SimpleDTOWorkflow::class);
+ }
+
+ public function __invoke()
+ {
+ }
+}
diff --git a/tests/src/Workflow/SagaWorkflow.php b/tests/src/Workflow/SagaWorkflow.php
new file mode 100644
index 00000000..e47c0203
--- /dev/null
+++ b/tests/src/Workflow/SagaWorkflow.php
@@ -0,0 +1,54 @@
+<?php
+
+/**
+ * This file is part of Temporal package.
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Temporal\Tests\Workflow;
+
+use Temporal\Activity\ActivityOptions;
+use Temporal\Common\RetryOptions;
+use Temporal\Tests\Activity\SimpleActivity;
+use Temporal\Workflow;
+
+#[Workflow\WorkflowInterface]
+class SagaWorkflow
+{
+ #[Workflow\WorkflowMethod(name: 'SagaWorkflow')]
+ public function run()
+ {
+ $simple = Workflow::newActivityStub(
+ SimpleActivity::class,
+ ActivityOptions::new()
+ ->withStartToCloseTimeout(60)
+ ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(1))
+ );
+
+ $saga = new Workflow\Saga();
+ $saga->setParallelCompensation(true);
+
+ try {
+ yield $simple->echo('test');
+ $saga->addCompensation(
+ function () use ($simple) {
+ yield $simple->echo('compensate echo');
+ }
+ );
+
+ yield $simple->lower('TEST');
+ $saga->addCompensation(
+ function () use ($simple) {
+ yield $simple->lower('COMPENSATE LOWER');
+ }
+ );
+
+ yield $simple->fail();
+ } catch (\Throwable $e) {
+ yield $saga->compensate();
+ throw $e;
+ }
+ }
+}