diff options
Diffstat (limited to 'testenv/tests/plugins')
-rw-r--r-- | testenv/tests/plugins/auth/test_pam.py | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/testenv/tests/plugins/auth/test_pam.py b/testenv/tests/plugins/auth/test_pam.py new file mode 100644 index 00000000..e4171d35 --- /dev/null +++ b/testenv/tests/plugins/auth/test_pam.py @@ -0,0 +1,92 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import asyncio +import signal +import pwd + +from typing import Dict +from typing import AsyncGenerator +from typing import Optional + +import pytest + +from . import get_configured_auth_service + + +# ===== +_UID = 1500 +_USER = "foobar" +_PASSWD = "query" + + +# ===== +async def _run_process(cmd: str, input: Optional[str]=None) -> None: # pylint: disable=redefined-builtin + proc = await asyncio.create_subprocess_exec( + *cmd.split(" "), + stdin=(asyncio.subprocess.PIPE if input is not None else None), + preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)), + ) + await proc.communicate(input.encode() if input is not None else None) + assert proc.returncode == 0 + + [email protected](name="test_user") +async def _test_user() -> AsyncGenerator[None, None]: + with pytest.raises(KeyError): + pwd.getpwnam(_USER) + await _run_process(f"useradd -u {_UID} -s /bin/bash {_USER}") + await _run_process("chpasswd", input=f"{_USER}:{_PASSWD}\n") + + assert pwd.getpwnam(_USER).pw_uid == _UID + + try: + yield + finally: + await _run_process(f"userdel -r {_USER}") + with pytest.raises(KeyError): + pwd.getpwnam(_USER) + + +# ===== [email protected]("kwargs", [ + {}, + {"allow_users": [_USER]}, + {"allow_uids_at": _UID}, +]) +async def test_ok(test_user, kwargs: Dict) -> None: # type: ignore + async with get_configured_auth_service("pam", **kwargs) as service: + assert not (await service.authorize(_USER, "invalid_password")) + assert (await service.authorize(_USER, _PASSWD)) + + [email protected]("kwargs", [ + {"allow_users": ["root"]}, + {"deny_users": [_USER]}, + {"allow_uids_at": _UID + 1}, +]) +async def test_fail(test_user, kwargs: Dict) -> None: # type: ignore + async with get_configured_auth_service("pam", **kwargs) as service: + assert not (await service.authorize(_USER, "invalid_password")) + assert not (await service.authorize(_USER, _PASSWD)) |