diff options
-rw-r--r-- | kvmd/apps/__init__.py | 4 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 4 | ||||
-rw-r--r-- | kvmd/apps/kvmd/auth.py | 10 | ||||
-rw-r--r-- | testenv/tests/auth/test_manager.py | 134 |
4 files changed, 144 insertions, 8 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 745a20e9..eea8ea71 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -153,11 +153,11 @@ def _get_config_scheme() -> Dict: }, "auth": { - "internal_users": Option([], type=valid_users_list), "internal_type": Option("htpasswd"), - "external_type": Option(""), # "internal": {}, + "external_type": Option(""), # "external": {}, + "internal_users": Option([], type=valid_users_list), }, "info": { diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index 7f5c320e..36ac871d 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -46,11 +46,11 @@ def main(argv: Optional[List[str]]=None) -> None: # pylint: disable=protected-access Server( auth_manager=AuthManager( - internal_users=config.auth.internal_users, internal_type=config.auth.internal_type, - external_type=config.auth.external_type, internal=config.auth.internal._unpack(), + external_type=config.auth.external_type, external=(config.auth.external._unpack() if config.auth.external_type else {}), + internal_users=config.auth.internal_users, ), info_manager=InfoManager(**config.info._unpack()), log_reader=LogReader(), diff --git a/kvmd/apps/kvmd/auth.py b/kvmd/apps/kvmd/auth.py index c90f10b5..22724421 100644 --- a/kvmd/apps/kvmd/auth.py +++ b/kvmd/apps/kvmd/auth.py @@ -36,16 +36,16 @@ from ...plugins.auth import get_auth_service_class class AuthManager: def __init__( self, - internal_users: List[str], internal_type: str, - external_type: str, - internal: Dict, + + external_type: str, external: Dict, + + internal_users: List[str], ) -> None: - self.__internal_users = internal_users self.__internal_service = get_auth_service_class(internal_type)(**internal) get_logger().info("Using internal login service %r", self.__internal_service.PLUGIN_NAME) @@ -54,6 +54,8 @@ class AuthManager: self.__external_service = get_auth_service_class(external_type)(**external) get_logger().info("Using external login service %r", self.__external_service.PLUGIN_NAME) + self.__internal_users = internal_users + self.__tokens: Dict[str, str] = {} # {token: user} async def login(self, user: str, passwd: str) -> Optional[str]: diff --git a/testenv/tests/auth/test_manager.py b/testenv/tests/auth/test_manager.py new file mode 100644 index 00000000..26306731 --- /dev/null +++ b/testenv/tests/auth/test_manager.py @@ -0,0 +1,134 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import contextlib + +from typing import List +from typing import AsyncGenerator +from typing import Optional + +import passlib.apache + +import pytest + +from kvmd.yamlconf import make_config + +from kvmd.apps.kvmd.auth import AuthManager + +from kvmd.plugins.auth import get_auth_service_class + + +# ===== +async def _get_configured_manager( + internal_path: str, + external_path: str="", + internal_users: Optional[List[str]]=None, +) -> AsyncGenerator[AuthManager, None]: + + internal_class = get_auth_service_class("htpasswd") + internal = make_config({"file": internal_path}, internal_class.get_options())._unpack() # pylint: disable=protected-access + internal.update(internal) + + if external_path: + external_class = get_auth_service_class("htpasswd") + external = make_config({"file": external_path}, external_class.get_options())._unpack() # pylint: disable=protected-access + external.update(external) + + manager = AuthManager( + internal_type="htpasswd", + internal=internal, + external_type=("htpasswd" if external_path else ""), + external=(external if external_path else {}), + internal_users=(internal_users or []), + ) + try: + yield manager + finally: + await manager.cleanup() + + +# ===== +async def test_ok__internal(tmpdir) -> None: # type: ignore + path = os.path.abspath(str(tmpdir.join("htpasswd"))) + + htpasswd = passlib.apache.HtpasswdFile(path, new=True) + htpasswd.set_password("admin", "pass") + htpasswd.save() + + async with _get_configured_manager(path) as manager: + assert manager.check("xxx") is None + manager.logout("xxx") + + assert (await manager.login("user", "foo")) is None + assert (await manager.login("admin", "foo")) is None + assert (await manager.login("user", "pass")) is None + + token = await manager.login("admin", "pass") + assert isinstance(token, str) + assert len(token) == 64 + + again = await manager.login("admin", "pass") + assert token == again + + assert manager.check(token) == "admin" + manager.logout(token) + assert manager.check(token) is None + + again = await manager.login("admin", "pass") + assert token != again + + +async def test_ok__external(tmpdir) -> None: # type: ignore + path1 = os.path.abspath(str(tmpdir.join("htpasswd1"))) + path2 = os.path.abspath(str(tmpdir.join("htpasswd2"))) + + htpasswd1 = passlib.apache.HtpasswdFile(path1, new=True) + htpasswd1.set_password("admin", "pass1") + htpasswd1.set_password("local", "foobar") + htpasswd1.save() + + htpasswd2 = passlib.apache.HtpasswdFile(path2, new=True) + htpasswd2.set_password("admin", "pass2") + htpasswd2.set_password("user", "foobar") + htpasswd2.save() + + async with _get_configured_manager(path1, path2, ["admin"]) as manager: + assert (await manager.login("local", "foobar")) is None + assert (await manager.login("admin", "pass2")) is None + + token = await manager.login("admin", "pass1") + assert token is not None + + assert manager.check(token) == "admin" + manager.logout(token) + assert manager.check(token) is None + + token = await manager.login("user", "foobar") + assert token is not None + + assert manager.check(token) == "user" + manager.logout(token) + assert manager.check(token) is None |