warpgate/tests/test_ssh_user_auth_otp.py
2025-05-23 17:02:20 +02:00

131 lines
4.4 KiB
Python

from asyncio import subprocess
from base64 import b64decode
from uuid import uuid4
import pyotp
import pytest
from pathlib import Path
from textwrap import dedent
from .api_client import admin_client, sdk
from .conftest import ProcessManager, WarpgateProcess
from .util import wait_port
class Test:
def test_otp(
self,
processes: ProcessManager,
wg_c_ed25519_pubkey: Path,
otp_key_base32: str,
otp_key_base64: str,
timeout,
shared_wg: WarpgateProcess,
):
ssh_port = processes.start_ssh_server(
trusted_keys=[wg_c_ed25519_pubkey.read_text()]
)
wait_port(ssh_port)
url = f"https://localhost:{shared_wg.http_port}"
with admin_client(url) as api:
role = api.create_role(
sdk.RoleDataRequest(name=f"role-{uuid4()}"),
)
user = api.create_user(sdk.CreateUserRequest(username=f"user-{uuid4()}"))
api.create_public_key_credential(
user.id,
sdk.NewPublicKeyCredential(
label="Public Key",
openssh_public_key=open("ssh-keys/id_ed25519.pub").read().strip(),
),
)
api.create_otp_credential(
user.id,
sdk.NewOtpCredential(
secret_key=list(b64decode(otp_key_base64)),
),
)
api.update_user(
user.id,
sdk.UserDataRequest(
username=user.username,
credential_policy=sdk.UserRequireCredentialsPolicy(
ssh=["PublicKey", "Totp"],
),
),
)
api.add_user_role(user.id, role.id)
ssh_target = api.create_target(
sdk.TargetDataRequest(
name=f"ssh-{uuid4()}",
options=sdk.TargetOptions(
sdk.TargetOptionsTargetSSHOptions(
kind="Ssh",
host="localhost",
port=ssh_port,
username="root",
auth=sdk.SSHTargetAuth(
sdk.SSHTargetAuthSshTargetPublicKeyAuth(
kind="PublicKey"
)
),
)
),
)
)
api.add_target_role(ssh_target.id, role.id)
totp = pyotp.TOTP(otp_key_base32)
script = dedent(
f"""
set timeout {timeout - 5}
spawn ssh {user.username}:{ssh_target.name}@localhost -p {shared_wg.ssh_port} -o StrictHostKeychecking=no -o UserKnownHostsFile=/dev/null -o IdentitiesOnly=yes -o IdentityFile=ssh-keys/id_ed25519 -o PreferredAuthentications=publickey,keyboard-interactive ls /bin/sh
expect "Two-factor authentication"
sleep 0.5
send "{totp.now()}\\r"
expect {{
"/bin/sh" {{ exit 0; }}
eof {{ exit 1; }}
}}
"""
)
ssh_client = processes.start(
["expect"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
output, stderr = ssh_client.communicate(script.encode(), timeout=timeout)
assert ssh_client.returncode == 0, output + stderr
script = dedent(
f"""
set timeout {timeout - 5}
spawn ssh {user.username}:{ssh_target.name}@localhost -p {[shared_wg.ssh_port]} -o StrictHostKeychecking=no -o UserKnownHostsFile=/dev/null -o IdentitiesOnly=yes -o IdentityFile=ssh-keys/id_ed25519 -o PreferredAuthentications=publickey,keyboard-interactive ls /bin/sh
expect "Two-factor authentication"
sleep 0.5
send "12345678\\r"
expect {{
"/bin/sh" {{ exit 0; }}
"Two-factor authentication" {{ exit 1; }}
eof {{ exit 1; }}
}}
"""
)
ssh_client = processes.start(
["expect"], stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
output = ssh_client.communicate(script.encode(), timeout=timeout)[0]
assert ssh_client.returncode != 0, output