diff options
Diffstat (limited to 'testenv/tests/apps/kvmd/test_auth.py')
-rw-r--r-- | testenv/tests/apps/kvmd/test_auth.py | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/testenv/tests/apps/kvmd/test_auth.py b/testenv/tests/apps/kvmd/test_auth.py new file mode 100644 index 00000000..be6b6455 --- /dev/null +++ b/testenv/tests/apps/kvmd/test_auth.py @@ -0,0 +1,133 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import contextlib + +from typing import List +from typing import Dict +from typing import AsyncGenerator +from typing import Optional + +import passlib.apache + +import pytest + +from kvmd.yamlconf import make_config + +from kvmd.apps.kvmd.auth import AuthManager + +from kvmd.plugins.auth import get_auth_service_class + + +# ===== +def _make_service_kwargs(path: str) -> Dict: + cls = get_auth_service_class("htpasswd") + scheme = cls.get_plugin_options() + return make_config({"file": path}, scheme)._unpack() # pylint: disable=protected-access + + +async def _get_configured_manager( + internal_path: str, + external_path: str="", + internal_users: Optional[List[str]]=None, +) -> AsyncGenerator[AuthManager, None]: + + manager = AuthManager( + internal_type="htpasswd", + internal_kwargs=_make_service_kwargs(internal_path), + external_type=("htpasswd" if external_path else ""), + external_kwargs=(_make_service_kwargs(external_path) if external_path else {}), + internal_users=(internal_users or []), + ) + + try: + yield manager + finally: + await manager.cleanup() + + +# ===== +async def test_ok__internal(tmpdir) -> None: # type: ignore + path = os.path.abspath(str(tmpdir.join("htpasswd"))) + + htpasswd = passlib.apache.HtpasswdFile(path, new=True) + htpasswd.set_password("admin", "pass") + htpasswd.save() + + async with _get_configured_manager(path) as manager: + assert manager.check("xxx") is None + manager.logout("xxx") + + assert (await manager.login("user", "foo")) is None + assert (await manager.login("admin", "foo")) is None + assert (await manager.login("user", "pass")) is None + + token = await manager.login("admin", "pass") + assert isinstance(token, str) + assert len(token) == 64 + + again = await manager.login("admin", "pass") + assert token == again + + assert manager.check(token) == "admin" + manager.logout(token) + assert manager.check(token) is None + + again = await manager.login("admin", "pass") + assert token != again + + +async def test_ok__external(tmpdir) -> None: # type: ignore + path1 = os.path.abspath(str(tmpdir.join("htpasswd1"))) + path2 = os.path.abspath(str(tmpdir.join("htpasswd2"))) + + htpasswd1 = passlib.apache.HtpasswdFile(path1, new=True) + htpasswd1.set_password("admin", "pass1") + htpasswd1.set_password("local", "foobar") + htpasswd1.save() + + htpasswd2 = passlib.apache.HtpasswdFile(path2, new=True) + htpasswd2.set_password("admin", "pass2") + htpasswd2.set_password("user", "foobar") + htpasswd2.save() + + async with _get_configured_manager(path1, path2, ["admin"]) as manager: + assert (await manager.login("local", "foobar")) is None + assert (await manager.login("admin", "pass2")) is None + + token = await manager.login("admin", "pass1") + assert token is not None + + assert manager.check(token) == "admin" + manager.logout(token) + assert manager.check(token) is None + + token = await manager.login("user", "foobar") + assert token is not None + + assert manager.check(token) == "user" + manager.logout(token) + assert manager.check(token) is None |