diff options
author | Valery Piashchynski <[email protected]> | 2021-02-10 20:00:22 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-10 20:00:22 +0300 |
commit | ae3dd0c3672217be0b3fb4042ef650477fba108b (patch) | |
tree | da5b08308e5aff50a102f41e254ee3620d41550e | |
parent | da64d9fbab7d73e203e7dbbb9503f4d422feaab0 (diff) |
Rewrite container for the workers
Update tests
-rwxr-xr-x | Makefile | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 5 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 3 | ||||
-rwxr-xr-x | pkg/worker/state.go | 5 | ||||
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 9 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack.go (renamed from pkg/worker_watcher/stack.go) | 16 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack_test.go (renamed from pkg/worker_watcher/stack_test.go) | 2 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 45 | ||||
-rw-r--r-- | pkg/worker_watcher/interface.go | 10 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 240 | ||||
-rw-r--r-- | plugins/http/config/http.go | 2 | ||||
-rw-r--r-- | plugins/server/plugin.go | 12 | ||||
-rw-r--r-- | tests/composer.json | 5 | ||||
-rw-r--r-- | tests/plugins/server/server_plugin_test.go | 2 | ||||
-rw-r--r-- | tests/psr-worker-bench.php | 2 | ||||
-rw-r--r-- | tests/src/Activity/SimpleActivity.php | 63 | ||||
-rw-r--r-- | tests/src/Client/StartNewWorkflow.php | 23 | ||||
-rw-r--r-- | tests/src/Workflow/SagaWorkflow.php | 54 |
18 files changed, 382 insertions, 118 deletions
@@ -24,7 +24,7 @@ uninstall: ## Uninstall locally installed RR rm -f /usr/local/bin/rr test_coverage: - docker-compose -f tests/docker-compose.yaml up -d + docker-compose -f tests/docker-compose.yaml up -d --remove-orphans rm -rf coverage mkdir coverage go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipe.out -covermode=atomic ./pkg/transport/pipe diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index bb68151f..f1b20bb9 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -130,7 +130,8 @@ func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { } func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { - return sp.ww.Remove(wb) + sp.ww.Remove(wb) + return nil } // Be careful, sync Exec with ExecWithContext @@ -208,6 +209,8 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error { const op = errors.Op("static_pool_check_max_jobs") if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + w.State().Set(worker.StateDestroyed) + sp.ww.Remove(w) err := sp.ww.Allocate() if err != nil { return errors.E(op, err) diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 4c1c90e5..44f5936c 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -148,8 +148,6 @@ func Test_StaticPool_JobError(t *testing.T) { cfg, ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) res, err := p.Exec(payload.Payload{Body: []byte("hello")}) @@ -163,6 +161,7 @@ func Test_StaticPool_JobError(t *testing.T) { } assert.Contains(t, err.Error(), "hello") + p.Destroy(ctx) } func Test_StaticPool_Broken_Replace(t *testing.T) { diff --git a/pkg/worker/state.go b/pkg/worker/state.go index c5d70a21..176e151b 100755 --- a/pkg/worker/state.go +++ b/pkg/worker/state.go @@ -32,9 +32,6 @@ const ( // StateErrored - error StateImpl (can't be used). StateErrored - - // StateRemove - worker is killed and removed from the stack - StateRemove ) type StateImpl struct { @@ -70,8 +67,6 @@ func (s *StateImpl) String() string { return "errored" case StateDestroyed: return "destroyed" - case StateRemove: - return "remove" } return "undefined" diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go new file mode 100644 index 00000000..bb66897f --- /dev/null +++ b/pkg/worker_watcher/container/interface.go @@ -0,0 +1,9 @@ +package container + +import "github.com/spiral/roadrunner/v2/pkg/worker" + +type Vector interface { + Enqueue(worker.BaseProcess) + Dequeue() (worker.BaseProcess, bool) + Destroy() +} diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/container/stack.go index 69e2024b..91c71b24 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/container/stack.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package container //nolint:golint,stylecheck import ( "context" "runtime" @@ -32,8 +32,8 @@ func (stack *Stack) Reset() { stack.workers = nil } -// Push worker back to the stack -// If stack in destroy state, Push will provide 100ms window to unlock the mutex +// Push worker back to the vec +// If vec in destroy state, Push will provide 100ms window to unlock the mutex func (stack *Stack) Push(w worker.BaseProcess) { stack.Lock() defer stack.Unlock() @@ -51,7 +51,7 @@ func (stack *Stack) Pop() (worker.BaseProcess, bool) { stack.Lock() defer stack.Unlock() - // do not release new stack + // do not release new vec if stack.destroy { return nil, true } @@ -71,7 +71,7 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { stack.Lock() defer stack.Unlock() for i := 0; i < len(stack.workers); i++ { - // worker in the stack, reallocating + // worker in the vec, reallocating if stack.workers[i].Pid() == pid { stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) stack.actualNumOfWorkers-- @@ -83,7 +83,7 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { return false } -// Workers return copy of the workers in the stack +// Workers return copy of the workers in the vec func (stack *Stack) Workers() []worker.BaseProcess { stack.Lock() defer stack.Unlock() @@ -124,11 +124,11 @@ func (stack *Stack) Destroy(_ context.Context) { } stack.Unlock() // unnecessary mutex, but - // just to make sure. All stack at this moment are in the stack + // just to make sure. All vec at this moment are in the vec // Pop operation is blocked, push can't be done, since it's not possible to pop stack.Lock() for i := 0; i < len(stack.workers); i++ { - // set state for the stack in the stack (unused at the moment) + // set state for the vec in the vec (unused at the moment) stack.workers[i].State().Set(worker.StateDestroyed) // kill the worker _ = stack.workers[i].Kill() diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/container/stack_test.go index 769419e4..59c15773 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/container/stack_test.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package container //nolint:golint,stylecheck import ( "context" "os/exec" diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go new file mode 100644 index 00000000..da9822ef --- /dev/null +++ b/pkg/worker_watcher/container/vec.go @@ -0,0 +1,45 @@ +package container //nolint:golint,stylecheck + +import ( + "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Vec struct { + destroy uint64 + workers chan worker.BaseProcess +} + +func NewVector(initialNumOfWorkers uint64) Vector { + vec := &Vec{ + destroy: 0, + workers: make(chan worker.BaseProcess, initialNumOfWorkers), + } + + return vec +} + +func (v *Vec) Enqueue(w worker.BaseProcess) { + v.workers <- w +} + +func (v *Vec) Dequeue() (worker.BaseProcess, bool) { + /* + if *addr == old { + *addr = new + return true + } + */ + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + return nil, true + } + + w := <-v.workers + + return w, false +} + +func (v *Vec) Destroy() { + atomic.StoreUint64(&v.destroy, 1) +} diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index a3552e7e..4625b7a7 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -8,7 +8,7 @@ import ( // Watcher is an interface for the Sync workers lifecycle type Watcher interface { - // Watch used to add workers to the stack + // Watch used to add workers to the container Watch(workers []worker.BaseProcess) error // Get provide first free worker @@ -20,12 +20,12 @@ type Watcher interface { // Allocate - allocates new worker and put it into the WorkerWatcher Allocate() error - // Destroy destroys the underlying stack + // Destroy destroys the underlying container Destroy(ctx context.Context) - // WorkersList return all stack w/o removing it from internal storage + // WorkersList return all container w/o removing it from internal storage List() []worker.BaseProcess - // RemoveWorker remove worker from the stack - Remove(wb worker.BaseProcess) error + // RemoveWorker remove worker from the container + Remove(wb worker.BaseProcess) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 2380c190..3e0633a3 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,33 +3,42 @@ package worker_watcher //nolint:golint,stylecheck import ( "context" "sync" + "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) -// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +// workerCreateFunc can be nil, but in that case, dead container will not be replaced func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(numWorkers), - allocator: allocator, - events: events, + container: container.NewVector(numWorkers), + numWorkers: numWorkers, + workers: make([]worker.BaseProcess, 0, numWorkers), + allocator: allocator, + events: events, } return ww } type workerWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator worker.Allocator - events events.Handler + sync.RWMutex + container container.Vector + // used to control the Destroy stage (that all workers are in the container) + numWorkers uint64 + workers []worker.BaseProcess + allocator worker.Allocator + events events.Handler } func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.stack.Push(workers[i]) + ww.container.Enqueue(workers[i]) + // add worker to watch slice + ww.workers = append(ww.workers, workers[i]) go func(swc worker.BaseProcess) { ww.wait(swc) @@ -38,75 +47,96 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { return nil } +// return value from Get +type get struct { + w worker.BaseProcess + err error +} + // Get is not a thread safe operation func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { + c := make(chan get, 1) const op = errors.Op("worker_watcher_get_free_worker") - // FAST PATH - // thread safe operation - w, stop := ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } - - // fast path, worker not nil and in the ReadyState - if w != nil && w.State().Value() == worker.StateReady { - return w, nil - } - // ========================================================= - // SLOW PATH - // Put worker back (no matter it's state, it will be killed next) - if w != nil { - ww.stack.Push(w) - } - // no free workers in the stack - // try to continuously get free one - for { - select { - default: - w, stop = ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) + go func() { + // FAST PATH + // thread safe operation + w, stop := ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), } - if w == nil { - continue + return + } + + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + c <- get{ + w, + nil, } + return + } + // ========================================================= + // SLOW PATH + _ = w.Kill() + // no free workers in the container + // try to continuously get free one + for { + select { + default: + w, stop = ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), + } + } - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - return w, nil - case worker.StateRemove: - err := ww.Remove(w) - if err != nil { - return nil, errors.E(op, err) + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + c <- get{ + w, + nil, + } + return + case worker.StateWorking: // how?? + ww.container.Enqueue(w) + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue } - // try to get next - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateWorking, // ??? how - worker.StateStopping: - // worker doing no work because it in the stack - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue } - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } + }() + + select { + case r := <-c: + if r.err != nil { + return nil, r.err + } + return r.w, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed")) } } func (ww *workerWatcher) Allocate() error { - ww.mutex.Lock() + ww.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() if err != nil { @@ -114,47 +144,83 @@ func (ww *workerWatcher) Allocate() error { } ww.addToWatch(sw) - ww.mutex.Unlock() - ww.Push(sw) + ww.workers = append(ww.workers, sw) + + ww.Unlock() + ww.Push(sw) return nil } // Remove -func (ww *workerWatcher) Remove(wb worker.BaseProcess) error { - ww.mutex.Lock() - defer ww.mutex.Unlock() +func (ww *workerWatcher) Remove(wb worker.BaseProcess) { + ww.Lock() + defer ww.Unlock() - const op = errors.Op("worker_watcher_remove_worker") // set remove state - wb.State().Set(worker.StateRemove) - if ww.stack.FindAndRemoveByPid(wb.Pid()) { - err := wb.Kill() - if err != nil { - return errors.E(op, err) + pid := wb.Pid() + + // worker will be removed on the Get operation + for i := 0; i < len(ww.workers); i++ { + if ww.workers[i].Pid() == pid { + ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) + // kill worker + _ = wb.Kill() + return } - return nil } - - return nil } // O(1) operation func (ww *workerWatcher) Push(w worker.BaseProcess) { - ww.mutex.Lock() - defer ww.mutex.Unlock() - ww.stack.Push(w) + ww.container.Enqueue(w) } -// Destroy all underlying stack (but let them to complete the task) +// Destroy all underlying container (but let them to complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { - // destroy stack, we don't use ww mutex here, since we should be able to push worker - ww.stack.Destroy(ctx) + // destroy container, we don't use ww mutex here, since we should be able to push worker + ww.Lock() + // do not release new workers + ww.container.Destroy() + ww.Unlock() + + tt := time.NewTicker(time.Millisecond * 500) + defer tt.Stop() + for { + select { + case <-tt.C: + ww.Lock() + // that might be one of the workers is working + if ww.numWorkers != uint64(len(ww.workers)) { + ww.Unlock() + continue + } + ww.Unlock() + // unnecessary mutex, but + // just to make sure. All container at this moment are in the container + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + return + } + } } // Warning, this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) List() []worker.BaseProcess { - return ww.stack.Workers() + ww.Lock() + defer ww.Unlock() + + base := make([]worker.BaseProcess, 0, len(ww.workers)) + for i := 0; i < len(ww.workers); i++ { + base = append(base, ww.workers[i]) + } + + return base } func (ww *workerWatcher) wait(w worker.BaseProcess) { @@ -174,7 +240,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { return } - _ = ww.stack.FindAndRemoveByPid(w.Pid()) + ww.Remove(w) err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ diff --git a/plugins/http/config/http.go b/plugins/http/config/http.go index bfbc1af6..022476e2 100644 --- a/plugins/http/config/http.go +++ b/plugins/http/config/http.go @@ -73,7 +73,7 @@ func (c *HTTP) InitDefaults() error { c.Pool = &poolImpl.Config{ Debug: false, NumWorkers: uint64(runtime.NumCPU()), - MaxJobs: 1000, + MaxJobs: 0, AllocateTimeout: time.Second * 60, DestroyTimeout: time.Second * 60, Supervisor: nil, diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 3fc77926..95e593b8 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -49,11 +49,6 @@ func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { server.cfg.InitDefaults() server.log = log - server.factory, err = server.initFactory() - if err != nil { - return errors.E(op, err) - } - return nil } @@ -64,7 +59,14 @@ func (server *Plugin) Name() string { // Serve (Start) server plugin (just a mock here to satisfy interface) func (server *Plugin) Serve() chan error { + const op = errors.Op("server_plugin_serve") errCh := make(chan error, 1) + var err error + server.factory, err = server.initFactory() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } return errCh } diff --git a/tests/composer.json b/tests/composer.json index 52fa3a0e..cff9bd6b 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -7,5 +7,10 @@ "spiral/roadrunner-http": "^2.0", "temporal/sdk": ">=1.0", "spiral/tokenizer": ">=2.7" + }, + "autoload": { + "psr-4": { + "Temporal\\Tests\\": "src" + } } } diff --git a/tests/plugins/server/server_plugin_test.go b/tests/plugins/server/server_plugin_test.go index f600832a..c0c3c993 100644 --- a/tests/plugins/server/server_plugin_test.go +++ b/tests/plugins/server/server_plugin_test.go @@ -278,7 +278,7 @@ func TestAppWrongRelay(t *testing.T) { } err = container.Init() - assert.Error(t, err) + assert.NoError(t, err) _, err = container.Serve() assert.Error(t, err) diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php index 3f634443..d0c72eae 100644 --- a/tests/psr-worker-bench.php +++ b/tests/psr-worker-bench.php @@ -49,7 +49,7 @@ if ($env->getMode() === 'http') { $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name); } - // register all activity + // register all activity foreach ($getClasses(__DIR__ . '/src/Activity') as $name) { $class = 'Temporal\\Tests\\Activity\\' . $name; $worker->registerActivityImplementations(new $class); diff --git a/tests/src/Activity/SimpleActivity.php b/tests/src/Activity/SimpleActivity.php new file mode 100644 index 00000000..576b126e --- /dev/null +++ b/tests/src/Activity/SimpleActivity.php @@ -0,0 +1,63 @@ +<?php + +namespace Temporal\Tests\Activity; + +use Temporal\Activity\ActivityInterface; +use Temporal\Activity\ActivityMethod; +use Temporal\Api\Common\V1\WorkflowExecution; +use Temporal\DataConverter\Bytes; +use Temporal\Tests\DTO\Message; +use Temporal\Tests\DTO\User; + +#[ActivityInterface(prefix: "SimpleActivity.")] +class SimpleActivity +{ + #[ActivityMethod] + public function echo( + string $input + ): string { + return strtoupper($input); + } + + #[ActivityMethod] + public function lower( + string $input + ): string { + return strtolower($input); + } + + #[ActivityMethod] + public function greet( + User $user + ): Message { + return new Message(sprintf("Hello %s <%s>", $user->name, $user->email)); + } + + #[ActivityMethod] + public function slow( + string $input + ): string { + sleep(2); + + return strtolower($input); + } + + #[ActivityMethod] + public function sha512( + Bytes $input + ): string { + return hash("sha512", ($input->getData())); + } + + public function updateRunID(WorkflowExecution $e): WorkflowExecution + { + $e->setRunId('updated'); + return $e; + } + + #[ActivityMethod] + public function fail() + { + throw new \Error("failed activity"); + } +}
\ No newline at end of file diff --git a/tests/src/Client/StartNewWorkflow.php b/tests/src/Client/StartNewWorkflow.php new file mode 100644 index 00000000..67bc1d01 --- /dev/null +++ b/tests/src/Client/StartNewWorkflow.php @@ -0,0 +1,23 @@ +<?php + + +namespace Temporal\Tests\Client; + +use Temporal\Client; +use Temporal\Tests\Workflow\SimpleDTOWorkflow; + +use function Symfony\Component\String\s; + +class StartNewWorkflow +{ + private $stub; + + public function __construct(Client\ClientInterface $client) + { + $this->stub = $client->newWorkflowStub(SimpleDTOWorkflow::class); + } + + public function __invoke() + { + } +} diff --git a/tests/src/Workflow/SagaWorkflow.php b/tests/src/Workflow/SagaWorkflow.php new file mode 100644 index 00000000..e47c0203 --- /dev/null +++ b/tests/src/Workflow/SagaWorkflow.php @@ -0,0 +1,54 @@ +<?php + +/** + * This file is part of Temporal package. + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Temporal\Tests\Workflow; + +use Temporal\Activity\ActivityOptions; +use Temporal\Common\RetryOptions; +use Temporal\Tests\Activity\SimpleActivity; +use Temporal\Workflow; + +#[Workflow\WorkflowInterface] +class SagaWorkflow +{ + #[Workflow\WorkflowMethod(name: 'SagaWorkflow')] + public function run() + { + $simple = Workflow::newActivityStub( + SimpleActivity::class, + ActivityOptions::new() + ->withStartToCloseTimeout(60) + ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(1)) + ); + + $saga = new Workflow\Saga(); + $saga->setParallelCompensation(true); + + try { + yield $simple->echo('test'); + $saga->addCompensation( + function () use ($simple) { + yield $simple->echo('compensate echo'); + } + ); + + yield $simple->lower('TEST'); + $saga->addCompensation( + function () use ($simple) { + yield $simple->lower('COMPENSATE LOWER'); + } + ); + + yield $simple->fail(); + } catch (\Throwable $e) { + yield $saga->compensate(); + throw $e; + } + } +} |