diff options
author | Devaev Maxim <[email protected]> | 2020-02-29 17:23:57 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-02-29 17:23:57 +0300 |
commit | 75d9b858d73bf3ed31597e554b27520e3e31f72e (patch) | |
tree | f52e60f468d7ad99d8e9abf2df590026a4502862 /testenv/tests/test_aioregion.py | |
parent | 5ef5e00da9e5d15270cdc253eb67bef72174b78f (diff) |
moved AioExclusiveRegion to aiotools
Diffstat (limited to 'testenv/tests/test_aioregion.py')
-rw-r--r-- | testenv/tests/test_aioregion.py | 117 |
1 files changed, 0 insertions, 117 deletions
diff --git a/testenv/tests/test_aioregion.py b/testenv/tests/test_aioregion.py deleted file mode 100644 index 4448bc54..00000000 --- a/testenv/tests/test_aioregion.py +++ /dev/null @@ -1,117 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -import asyncio - -import pytest - -from kvmd.aioregion import AioExclusiveRegion - - -# ===== -class RegionIsBusyError(Exception): - pass - - -# ===== -async def test_ok__access_one() -> None: - region = AioExclusiveRegion(RegionIsBusyError) - - async def func() -> None: - assert not region.is_busy() - with region: - assert region.is_busy() - assert not region.is_busy() - - await func() - - assert not region.is_busy() - region.exit() - assert not region.is_busy() - - -async def test_fail__access_one() -> None: - region = AioExclusiveRegion(RegionIsBusyError) - - async def func() -> None: - assert not region.is_busy() - with region: - assert region.is_busy() - region.enter() - assert not region.is_busy() - - with pytest.raises(RegionIsBusyError): - await func() - - assert not region.is_busy() - region.exit() - assert not region.is_busy() - - -# ===== -async def test_ok__access_two() -> None: - region = AioExclusiveRegion(RegionIsBusyError) - - async def func1() -> None: - with region: - await asyncio.sleep(1) - print("done func1()") - - async def func2() -> None: - await asyncio.sleep(2) - print("waiking up func2()") - with region: - await asyncio.sleep(1) - print("done func2()") - - await asyncio.gather(func1(), func2()) - - assert not region.is_busy() - region.exit() - assert not region.is_busy() - - -async def test_fail__access_two() -> None: - region = AioExclusiveRegion(RegionIsBusyError) - - async def func1() -> None: - with region: - await asyncio.sleep(2) - print("done func1()") - - async def func2() -> None: - await asyncio.sleep(1) - with region: - await asyncio.sleep(1) - print("done func2()") - - results = await asyncio.gather(func1(), func2(), return_exceptions=True) - assert results[0] is None - assert type(results[1]) == RegionIsBusyError # pylint: disable=unidiomatic-typecheck - - assert not region.is_busy() - region.exit() - assert not region.is_busy() |