summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-04-11 05:36:38 +0300
committerDevaev Maxim <[email protected]>2019-04-11 05:36:38 +0300
commita168ce9d8f90470b80ae265021bfdd7570bef530 (patch)
tree7bbe0c54b9e4c1cfc03002fdfd917ca00c2092b8
parent060140d6540d045530ceefda6e639f39588e7dab (diff)
better auth testing
-rw-r--r--kvmd/plugins/auth/http.py32
-rw-r--r--testenv/tox.ini2
-rw-r--r--tests/auth/test_service_htpasswd.py6
-rw-r--r--tests/auth/test_service_http.py39
4 files changed, 49 insertions, 30 deletions
diff --git a/kvmd/plugins/auth/http.py b/kvmd/plugins/auth/http.py
index 16c6500d..18ecaae6 100644
--- a/kvmd/plugins/auth/http.py
+++ b/kvmd/plugins/auth/http.py
@@ -46,7 +46,6 @@ class Plugin(BaseAuthService):
self,
url: str,
verify: bool,
- post: bool,
user: str,
passwd: str,
timeout: float,
@@ -54,7 +53,6 @@ class Plugin(BaseAuthService):
self.__url = url
self.__verify = verify
- self.__post = post
self.__user = user
self.__passwd = passwd
self.__timeout = timeout
@@ -64,31 +62,29 @@ class Plugin(BaseAuthService):
@classmethod
def get_options(cls) -> Dict[str, Option]:
return {
- "url": Option("http://localhost/auth_post"),
+ "url": Option("http://localhost/auth"),
"verify": Option(True, type=valid_bool),
- "post": Option(True, type=valid_bool),
"user": Option(""),
"passwd": Option(""),
"timeout": Option(5.0, type=valid_float_f01),
}
async def login(self, user: str, passwd: str) -> bool:
- kwargs: Dict = {
- "method": "GET",
- "url": self.__url,
- "timeout": self.__timeout,
- "headers": {
- "User-Agent": "KVMD/%s" % (__version__),
- "X-KVMD-User": user,
- },
- }
- if self.__post:
- kwargs["method"] = "POST"
- kwargs["json"] = {"user": user, "passwd": passwd}
-
session = self.__ensure_session()
try:
- async with session.request(**kwargs) as response:
+ async with session.request(
+ method="POST",
+ url=self.__url,
+ timeout=self.__timeout,
+ json={
+ "user": user,
+ "passwd": passwd
+ },
+ headers={
+ "User-Agent": "KVMD/%s" % (__version__),
+ "X-KVMD-User": user,
+ },
+ ) as response:
response.raise_for_status()
assert response.status == 200
return True
diff --git a/testenv/tox.ini b/testenv/tox.ini
index 2e4ece8a..3be89343 100644
--- a/testenv/tox.ini
+++ b/testenv/tox.ini
@@ -19,6 +19,7 @@ deps =
pylint
pytest
pytest-asyncio
+ aiohttp-basicauth
-rrequirements.txt
[testenv:mypy]
@@ -41,6 +42,7 @@ deps =
pytest-mock
pytest-asyncio
pytest-aiohttp
+ aiohttp-basicauth
-rrequirements.txt
[testenv:eslint]
diff --git a/tests/auth/test_service_htpasswd.py b/tests/auth/test_service_htpasswd.py
index 9a24378e..6378d896 100644
--- a/tests/auth/test_service_htpasswd.py
+++ b/tests/auth/test_service_htpasswd.py
@@ -35,12 +35,14 @@ async def test_ok__htpasswd_service(tmpdir) -> None: # type: ignore
path = os.path.abspath(str(tmpdir.join("htpasswd")))
htpasswd = passlib.apache.HtpasswdFile(path, new=True)
- htpasswd.set_password("admin", "foo")
+ htpasswd.set_password("admin", "pass")
htpasswd.save()
async with get_configured_auth_service("htpasswd", file=path) as service:
- assert (await service.login("admin", "foo"))
assert not (await service.login("user", "foo"))
+ assert not (await service.login("admin", "foo"))
+ assert not (await service.login("user", "pass"))
+ assert (await service.login("admin", "pass"))
htpasswd.set_password("admin", "bar")
htpasswd.set_password("user", "bar")
diff --git a/tests/auth/test_service_http.py b/tests/auth/test_service_http.py
index 6390f368..c6f2b631 100644
--- a/tests/auth/test_service_http.py
+++ b/tests/auth/test_service_http.py
@@ -20,9 +20,11 @@
# ========================================================================== #
+from typing import Dict
from typing import AsyncGenerator
import aiohttp.web
+import aiohttp_basicauth
import pytest
@@ -30,19 +32,27 @@ from . import get_configured_auth_service
# =====
-async def _handle_auth_post(request: aiohttp.web.BaseRequest) -> aiohttp.web.Response:
+async def _handle_auth(request: aiohttp.web.BaseRequest) -> aiohttp.web.Response:
status = 400
if request.method == "POST":
credentials = (await request.json())
- if credentials["user"] == "admin" and credentials["passwd"] == "foobar":
+ if credentials["user"] == "admin" and credentials["passwd"] == "pass":
status = 200
return aiohttp.web.Response(text=str(status), status=status)
@pytest.fixture(name="auth_server_port")
async def _auth_server_port_fixture(aiohttp_server) -> AsyncGenerator[int, None]: # type: ignore
- app = aiohttp.web.Application()
- app.router.add_post("/auth_post", _handle_auth_post)
+ auth = aiohttp_basicauth.BasicAuthMiddleware(
+ username="server-admin",
+ password="server-pass",
+ force=False,
+ )
+
+ app = aiohttp.web.Application(middlewares=[auth])
+ app.router.add_post("/auth", _handle_auth)
+ app.router.add_post("/auth_plus_basic", auth.required(_handle_auth))
+
server = await aiohttp_server(app)
try:
yield server.port
@@ -52,9 +62,18 @@ async def _auth_server_port_fixture(aiohttp_server) -> AsyncGenerator[int, None]
# =====
@pytest.mark.asyncio
-async def test_ok__http_service(auth_server_port: int) -> None:
- url = "http://localhost:%d/auth_post" % (auth_server_port)
- async with get_configured_auth_service("http", url=url) as service:
- assert not (await service.login("admin", "foo"))
- assert not (await service.login("user", "foo"))
- assert (await service.login("admin", "foobar"))
[email protected]("kwargs", [
+ {},
+ {"verify": False},
+ {"user": "server-admin", "passwd": "server-pass"},
+])
+async def test_ok(auth_server_port: int, kwargs: Dict) -> None:
+ url = "http://localhost:%d/%s" % (
+ auth_server_port,
+ ("auth_plus_basic" if kwargs.get("user") else "auth"),
+ )
+ async with get_configured_auth_service("http", url=url, **kwargs) as service:
+ assert not (await service.login("user", "foobar"))
+ assert not (await service.login("admin", "foobar"))
+ assert not (await service.login("user", "pass"))
+ assert (await service.login("admin", "pass"))