warpgate/tests/test_http_user_auth_ticket.py
2022-11-11 17:00:23 +01:00

132 lines
4.2 KiB
Python

import requests
from uuid import uuid4
from .api_client import (
api_admin_session,
api_create_target,
api_create_user,
api_create_role,
api_add_role_to_user,
api_create_ticket,
api_add_role_to_target,
)
from .conftest import WarpgateProcess
from .test_http_common import * # noqa
class TestHTTPUserAuthTicket:
def test_auth_password_success(
self,
echo_server_port,
shared_wg: WarpgateProcess,
):
url = f"https://localhost:{shared_wg.http_port}"
with api_admin_session(url) as session:
role = api_create_role(url, session, {"name": f"role-{uuid4()}"})
user = api_create_user(
url,
session,
{
"username": f"user-{uuid4()}",
"credentials": [
{
"kind": "Password",
"hash": "123",
},
],
},
)
api_add_role_to_user(url, session, user["id"], role["id"])
echo_target = api_create_target(
url,
session,
{
"name": f"echo-{uuid4()}",
"options": {
"kind": "Http",
"url": f"http://localhost:{echo_server_port}",
"tls": {
"mode": "Disabled",
"verify": False,
},
},
},
)
other_target = api_create_target(
url,
session,
{
"name": f"other-{uuid4()}",
"options": {
"kind": "Http",
"url": "http://badhost",
"tls": {
"mode": "Disabled",
"verify": False,
},
},
},
)
api_add_role_to_target(url, session, echo_target["id"], role["id"])
secret = api_create_ticket(
url, session, user["username"], echo_target["name"]
)
# ---
session = requests.Session()
session.verify = False
response = session.get(
f"{url}/some/path?warpgate-target={echo_target['name']}",
allow_redirects=False,
)
assert response.status_code // 100 != 2
# Ticket as a header
response = session.get(
f"{url}/some/path?warpgate-target={echo_target['name']}",
allow_redirects=False,
headers={
"Authorization": f"Warpgate {secret}",
},
)
assert response.status_code // 100 == 2
assert response.json()["path"] == "/some/path"
# Bad ticket
response = session.get(
f"{url}/some/path?warpgate-target={echo_target['name']}",
allow_redirects=False,
headers={
"Authorization": f"Warpgate bad{secret}",
},
)
assert response.status_code // 100 != 2
# Ticket as a GET param
session = requests.Session()
session.verify = False
response = session.get(
f"{url}/some/path?warpgate-ticket={secret}",
allow_redirects=False,
)
assert response.status_code // 100 == 2
assert response.json()["path"] == "/some/path"
# Ensure no access to other targets
session = requests.Session()
session.verify = False
response = session.get(
f"{url}/some/path?warpgate-ticket={secret}&warpgate-target=admin",
allow_redirects=False,
)
assert response.status_code // 100 == 2
assert response.json()["path"] == "/some/path"
response = session.get(
f"{url}/some/path?warpgate-ticket={secret}&warpgate-target={other_target['name']}",
allow_redirects=False,
)
assert response.status_code // 100 == 2
assert response.json()["path"] == "/some/path"