diff options
Diffstat (limited to 'testenv/tests/plugins/auth')
-rw-r--r-- | testenv/tests/plugins/auth/__init__.py | 43 | ||||
-rw-r--r-- | testenv/tests/plugins/auth/test_htpasswd.py | 54 | ||||
-rw-r--r-- | testenv/tests/plugins/auth/test_http.py | 79 |
3 files changed, 176 insertions, 0 deletions
diff --git a/testenv/tests/plugins/auth/__init__.py b/testenv/tests/plugins/auth/__init__.py new file mode 100644 index 00000000..7d0d0fb4 --- /dev/null +++ b/testenv/tests/plugins/auth/__init__.py @@ -0,0 +1,43 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import contextlib + +from typing import AsyncGenerator +from typing import Any + +from kvmd.yamlconf import make_config + +from kvmd.plugins.auth import BaseAuthService +from kvmd.plugins.auth import get_auth_service_class + + +# ===== +async def get_configured_auth_service(name: str, **kwargs: Any) -> AsyncGenerator[BaseAuthService, None]: + service_class = get_auth_service_class(name) + config = make_config(kwargs, service_class.get_plugin_options()) + service = service_class(**config._unpack()) # pylint: disable=protected-access + try: + yield service + finally: + await service.cleanup() diff --git a/testenv/tests/plugins/auth/test_htpasswd.py b/testenv/tests/plugins/auth/test_htpasswd.py new file mode 100644 index 00000000..9fe68e6d --- /dev/null +++ b/testenv/tests/plugins/auth/test_htpasswd.py @@ -0,0 +1,54 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os + +import passlib.apache + +import pytest + +from . import get_configured_auth_service + + +# ===== +async def test_ok__htpasswd_service(tmpdir) -> None: # type: ignore + path = os.path.abspath(str(tmpdir.join("htpasswd"))) + + htpasswd = passlib.apache.HtpasswdFile(path, new=True) + htpasswd.set_password("admin", "pass") + htpasswd.save() + + async with get_configured_auth_service("htpasswd", file=path) as service: + assert not (await service.authorize("user", "foo")) + assert not (await service.authorize("admin", "foo")) + assert not (await service.authorize("user", "pass")) + assert (await service.authorize("admin", "pass")) + + htpasswd.set_password("admin", "bar") + htpasswd.set_password("user", "bar") + htpasswd.save() + + assert (await service.authorize("admin", "bar")) + assert (await service.authorize("user", "bar")) + assert not (await service.authorize("admin", "foo")) + assert not (await service.authorize("user", "foo")) diff --git a/testenv/tests/plugins/auth/test_http.py b/testenv/tests/plugins/auth/test_http.py new file mode 100644 index 00000000..6c584dc1 --- /dev/null +++ b/testenv/tests/plugins/auth/test_http.py @@ -0,0 +1,79 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Dict +from typing import AsyncGenerator + +import aiohttp.web +import aiohttp_basicauth + +import pytest + +from . import get_configured_auth_service + + +# ===== +async def _handle_auth(request: aiohttp.web.BaseRequest) -> aiohttp.web.Response: + status = 400 + if request.method == "POST": + credentials = (await request.json()) + if credentials["user"] == "admin" and credentials["passwd"] == "pass": + status = 200 + return aiohttp.web.Response(text=str(status), status=status) + + [email protected](name="auth_server_port") +async def _auth_server_port_fixture(aiohttp_server) -> AsyncGenerator[int, None]: # type: ignore + auth = aiohttp_basicauth.BasicAuthMiddleware( + username="server-admin", + password="server-pass", + force=False, + ) + + app = aiohttp.web.Application(middlewares=[auth]) + app.router.add_post("/auth", _handle_auth) + app.router.add_post("/auth_plus_basic", auth.required(_handle_auth)) + + server = await aiohttp_server(app) + try: + yield server.port + finally: + await server.close() + + +# ===== [email protected]("kwargs", [ + {}, + {"verify": False}, + {"user": "server-admin", "passwd": "server-pass"}, +]) +async def test_ok(auth_server_port: int, kwargs: Dict) -> None: + url = "http://localhost:%d/%s" % ( + auth_server_port, + ("auth_plus_basic" if kwargs.get("user") else "auth"), + ) + async with get_configured_auth_service("http", url=url, **kwargs) as service: + assert not (await service.authorize("user", "foobar")) + assert not (await service.authorize("admin", "foobar")) + assert not (await service.authorize("user", "pass")) + assert (await service.authorize("admin", "pass")) |