diff options
Diffstat (limited to 'testenv/tests/auth')
-rw-r--r-- | testenv/tests/auth/__init__.py | 43 | ||||
-rw-r--r-- | testenv/tests/auth/test_manager.py | 133 | ||||
-rw-r--r-- | testenv/tests/auth/test_service_htpasswd.py | 54 | ||||
-rw-r--r-- | testenv/tests/auth/test_service_http.py | 79 |
4 files changed, 0 insertions, 309 deletions
diff --git a/testenv/tests/auth/__init__.py b/testenv/tests/auth/__init__.py deleted file mode 100644 index 7d0d0fb4..00000000 --- a/testenv/tests/auth/__init__.py +++ /dev/null @@ -1,43 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -import contextlib - -from typing import AsyncGenerator -from typing import Any - -from kvmd.yamlconf import make_config - -from kvmd.plugins.auth import BaseAuthService -from kvmd.plugins.auth import get_auth_service_class - - -# ===== -async def get_configured_auth_service(name: str, **kwargs: Any) -> AsyncGenerator[BaseAuthService, None]: - service_class = get_auth_service_class(name) - config = make_config(kwargs, service_class.get_plugin_options()) - service = service_class(**config._unpack()) # pylint: disable=protected-access - try: - yield service - finally: - await service.cleanup() diff --git a/testenv/tests/auth/test_manager.py b/testenv/tests/auth/test_manager.py deleted file mode 100644 index be6b6455..00000000 --- a/testenv/tests/auth/test_manager.py +++ /dev/null @@ -1,133 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -import os -import contextlib - -from typing import List -from typing import Dict -from typing import AsyncGenerator -from typing import Optional - -import passlib.apache - -import pytest - -from kvmd.yamlconf import make_config - -from kvmd.apps.kvmd.auth import AuthManager - -from kvmd.plugins.auth import get_auth_service_class - - -# ===== -def _make_service_kwargs(path: str) -> Dict: - cls = get_auth_service_class("htpasswd") - scheme = cls.get_plugin_options() - return make_config({"file": path}, scheme)._unpack() # pylint: disable=protected-access - - -async def _get_configured_manager( - internal_path: str, - external_path: str="", - internal_users: Optional[List[str]]=None, -) -> AsyncGenerator[AuthManager, None]: - - manager = AuthManager( - internal_type="htpasswd", - internal_kwargs=_make_service_kwargs(internal_path), - external_type=("htpasswd" if external_path else ""), - external_kwargs=(_make_service_kwargs(external_path) if external_path else {}), - internal_users=(internal_users or []), - ) - - try: - yield manager - finally: - await manager.cleanup() - - -# ===== -async def test_ok__internal(tmpdir) -> None: # type: ignore - path = os.path.abspath(str(tmpdir.join("htpasswd"))) - - htpasswd = passlib.apache.HtpasswdFile(path, new=True) - htpasswd.set_password("admin", "pass") - htpasswd.save() - - async with _get_configured_manager(path) as manager: - assert manager.check("xxx") is None - manager.logout("xxx") - - assert (await manager.login("user", "foo")) is None - assert (await manager.login("admin", "foo")) is None - assert (await manager.login("user", "pass")) is None - - token = await manager.login("admin", "pass") - assert isinstance(token, str) - assert len(token) == 64 - - again = await manager.login("admin", "pass") - assert token == again - - assert manager.check(token) == "admin" - manager.logout(token) - assert manager.check(token) is None - - again = await manager.login("admin", "pass") - assert token != again - - -async def test_ok__external(tmpdir) -> None: # type: ignore - path1 = os.path.abspath(str(tmpdir.join("htpasswd1"))) - path2 = os.path.abspath(str(tmpdir.join("htpasswd2"))) - - htpasswd1 = passlib.apache.HtpasswdFile(path1, new=True) - htpasswd1.set_password("admin", "pass1") - htpasswd1.set_password("local", "foobar") - htpasswd1.save() - - htpasswd2 = passlib.apache.HtpasswdFile(path2, new=True) - htpasswd2.set_password("admin", "pass2") - htpasswd2.set_password("user", "foobar") - htpasswd2.save() - - async with _get_configured_manager(path1, path2, ["admin"]) as manager: - assert (await manager.login("local", "foobar")) is None - assert (await manager.login("admin", "pass2")) is None - - token = await manager.login("admin", "pass1") - assert token is not None - - assert manager.check(token) == "admin" - manager.logout(token) - assert manager.check(token) is None - - token = await manager.login("user", "foobar") - assert token is not None - - assert manager.check(token) == "user" - manager.logout(token) - assert manager.check(token) is None diff --git a/testenv/tests/auth/test_service_htpasswd.py b/testenv/tests/auth/test_service_htpasswd.py deleted file mode 100644 index 9fe68e6d..00000000 --- a/testenv/tests/auth/test_service_htpasswd.py +++ /dev/null @@ -1,54 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -import os - -import passlib.apache - -import pytest - -from . import get_configured_auth_service - - -# ===== -async def test_ok__htpasswd_service(tmpdir) -> None: # type: ignore - path = os.path.abspath(str(tmpdir.join("htpasswd"))) - - htpasswd = passlib.apache.HtpasswdFile(path, new=True) - htpasswd.set_password("admin", "pass") - htpasswd.save() - - async with get_configured_auth_service("htpasswd", file=path) as service: - assert not (await service.authorize("user", "foo")) - assert not (await service.authorize("admin", "foo")) - assert not (await service.authorize("user", "pass")) - assert (await service.authorize("admin", "pass")) - - htpasswd.set_password("admin", "bar") - htpasswd.set_password("user", "bar") - htpasswd.save() - - assert (await service.authorize("admin", "bar")) - assert (await service.authorize("user", "bar")) - assert not (await service.authorize("admin", "foo")) - assert not (await service.authorize("user", "foo")) diff --git a/testenv/tests/auth/test_service_http.py b/testenv/tests/auth/test_service_http.py deleted file mode 100644 index 6c584dc1..00000000 --- a/testenv/tests/auth/test_service_http.py +++ /dev/null @@ -1,79 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -from typing import Dict -from typing import AsyncGenerator - -import aiohttp.web -import aiohttp_basicauth - -import pytest - -from . import get_configured_auth_service - - -# ===== -async def _handle_auth(request: aiohttp.web.BaseRequest) -> aiohttp.web.Response: - status = 400 - if request.method == "POST": - credentials = (await request.json()) - if credentials["user"] == "admin" and credentials["passwd"] == "pass": - status = 200 - return aiohttp.web.Response(text=str(status), status=status) - - [email protected](name="auth_server_port") -async def _auth_server_port_fixture(aiohttp_server) -> AsyncGenerator[int, None]: # type: ignore - auth = aiohttp_basicauth.BasicAuthMiddleware( - username="server-admin", - password="server-pass", - force=False, - ) - - app = aiohttp.web.Application(middlewares=[auth]) - app.router.add_post("/auth", _handle_auth) - app.router.add_post("/auth_plus_basic", auth.required(_handle_auth)) - - server = await aiohttp_server(app) - try: - yield server.port - finally: - await server.close() - - -# ===== [email protected]("kwargs", [ - {}, - {"verify": False}, - {"user": "server-admin", "passwd": "server-pass"}, -]) -async def test_ok(auth_server_port: int, kwargs: Dict) -> None: - url = "http://localhost:%d/%s" % ( - auth_server_port, - ("auth_plus_basic" if kwargs.get("user") else "auth"), - ) - async with get_configured_auth_service("http", url=url, **kwargs) as service: - assert not (await service.authorize("user", "foobar")) - assert not (await service.authorize("admin", "foobar")) - assert not (await service.authorize("user", "pass")) - assert (await service.authorize("admin", "pass")) |