feat(auth): harden security across auth and API

- Require reauthentication to update security settings via API
  (API key or username/password; accepted in body or headers)
- Add current_username/current_password/current_api_key to request
  model for secure updates
- Mitigate timing attacks in Basic auth by verifying password using a
  dummy hash when username mismatches; improve failure logging
- Enforce restrictive permissions (0600) on qbm_settings.yml during
  load/save; warn and attempt automatic correction if permissive
- Lock down CORS defaults: no origins allowed, credentials disabled,
  explicit methods/headers only
- Prevent path traversal on config filenames via strict validation and
  resolve checks
- Automatically redact secrets in logs by registering sensitive fields
  (passwords, tokens, keys)
- Redact password_hash and api_key in security settings responses
- Audit log security setting changes and reload middleware on save

BREAKING CHANGE: CORS is now denied by default (no allowed origins,
credentials disabled). Cross-origin clients must be explicitly allowed.
Updating security settings now requires current credentials (API key or
username/password).
This commit is contained in:
bobokun 2025-09-07 13:58:44 -04:00
parent 85520bb224
commit d280787bd1
No known key found for this signature in database
GPG key ID: B73932169607D927
3 changed files with 218 additions and 22 deletions

View file

@ -1 +1 @@
4.6.1-develop17
4.6.1-develop18

View file

@ -2,6 +2,7 @@
import base64
import hashlib
import os
import re
import secrets
from datetime import datetime
@ -75,6 +76,11 @@ class SecuritySettingsRequest(BaseModel):
generate_api_key: bool = False
clear_api_key: bool = False
# Current credentials for reauthentication
current_username: str = ""
current_password: str = ""
current_api_key: str = ""
@validator("username")
def username_must_be_valid(cls, v):
if v and (len(v) < 3 or len(v) > 50):
@ -243,6 +249,23 @@ class AuthenticationMiddleware(BaseHTTPMiddleware):
try:
if self.settings_path.exists():
# Check file permissions for security
try:
stat_info = self.settings_path.stat()
# Check if file is readable/writable by group or others (should be 600 or similar)
if stat_info.st_mode & 0o077: # Check if group/other has any permissions
logger.warning(
f"Settings file {self.settings_path} has overly permissive permissions. "
f"Fixing to 600 (owner read/write only)."
)
try:
os.chmod(str(self.settings_path), 0o600)
logger.info(f"Fixed permissions for settings file {self.settings_path} to 600.")
except OSError as e:
logger.error(f"Could not fix permissions for settings file: {e}")
except (OSError, AttributeError) as e:
logger.warning(f"Could not check permissions for settings file: {e}")
with open(self.settings_path, encoding="utf-8") as f:
data = ruamel.yaml.YAML().load(f) or {}
@ -363,14 +386,25 @@ class AuthenticationMiddleware(BaseHTTPMiddleware):
decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8")
username, password = decoded_credentials.split(":", 1)
# Verify credentials
if username == settings.username and verify_password(password, settings.password_hash):
return await call_next(request)
# Verify credentials with constant-time comparison to prevent timing attacks
if username == settings.username:
# Username matches, verify password
if verify_password(password, settings.password_hash):
return await call_next(request)
else:
logger.debug("Basic auth: invalid password")
else:
logger.debug("Basic auth: invalid credentials")
# Record failed authentication attempt
record_auth_attempt(request)
return self._auth_challenge_response()
# Username doesn't match, but still verify password against dummy hash for constant time
dummy_hash = (
"$argon2id$v=19$m=65536,t=3,p=4$pSFTuqiWsyxxjLN0fuXtrw$"
"MvnRW8OTPn6ED7sun2PvdxqJuYDLvog+OpEmA+Y4eSQ"
) # Dummy hash
verify_password(password, dummy_hash) # This will always fail but takes constant time
logger.debug("Basic auth: invalid username")
# Record failed authentication attempt
record_auth_attempt(request)
return self._auth_challenge_response()
except Exception:
logger.debug("Basic auth: error processing credentials")
@ -449,6 +483,12 @@ def save_auth_settings(settings_path: Path, settings: AuthSettings) -> bool:
with open(settings_path, "w", encoding="utf-8") as f:
yaml.dump(data, f)
# Set restrictive permissions (600 - owner read/write only)
try:
os.chmod(str(settings_path), 0o600)
except OSError as e:
logger.error(f"Could not set permissions for settings file: {e}")
return True
except Exception as e:
logger.error(f"Error saving authentication settings: {e}")

View file

@ -44,10 +44,12 @@ from modules import util
from modules.auth import AuthenticationMiddleware
from modules.auth import AuthSettings
from modules.auth import SecuritySettingsRequest
from modules.auth import authenticate_user
from modules.auth import generate_api_key
from modules.auth import hash_password
from modules.auth import load_auth_settings
from modules.auth import save_auth_settings
from modules.auth import verify_api_key
from modules.config import Config
from modules.scheduler import Scheduler
from modules.util import YAML
@ -265,13 +267,13 @@ class WebAPI:
object.__setattr__(self, "logs_path", Path(self.default_dir) / "logs")
object.__setattr__(self, "backup_path", Path(self.default_dir) / ".backups")
# Configure CORS
# Configure CORS - restrict to prevent unauthorized cross-origin access
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
allow_origins=[], # No cross-origin requests allowed by default
allow_credentials=False, # Disable credentials to prevent CSRF via CORS
allow_methods=["GET", "POST", "PUT", "DELETE"], # Explicit allowed methods
allow_headers=["Authorization", "Content-Type", "X-API-Key"], # Explicit allowed headers
)
# Configure Rate Limiting
@ -740,11 +742,12 @@ class WebAPI:
async def get_config(self, filename: str) -> ConfigResponse:
"""Get a specific configuration file."""
try:
# Validate filename to prevent path traversal
config_file_path = self._validate_config_filename(filename)
# Define sensitive configuration files that should not be accessible via API
sensitive_files = {"qbm_settings.yml"}
config_file_path = self.config_path / filename
if not config_file_path.exists():
raise HTTPException(status_code=404, detail=f"Configuration file '{filename}' not found")
@ -778,7 +781,8 @@ class WebAPI:
async def create_config(self, filename: str, request: ConfigRequest) -> dict:
"""Create a new configuration file."""
try:
config_file_path = self.config_path / filename
# Validate filename to prevent path traversal
config_file_path = self._validate_config_filename(filename)
if config_file_path.exists():
raise HTTPException(status_code=409, detail=f"Configuration file '{filename}' already exists")
@ -799,7 +803,8 @@ class WebAPI:
async def update_config(self, filename: str, request: ConfigRequest) -> dict:
"""Update an existing configuration file."""
try:
config_file_path = self.config_path / filename
# Validate filename to prevent path traversal
config_file_path = self._validate_config_filename(filename)
if not config_file_path.exists():
raise HTTPException(status_code=404, detail=f"Configuration file '{filename}' not found")
@ -807,13 +812,16 @@ class WebAPI:
# Create backup
await self._create_backup(config_file_path)
# Debug: Log what we received from frontend
# Register sensitive fields as secrets for automatic redaction in logs
self._register_sensitive_fields_as_secrets(request.data)
# Debug: Log what we received from frontend (secrets will be automatically redacted)
logger.trace(f"[DEBUG] Raw data received from frontend: {json.dumps(request.data, indent=2, default=str)}")
# Convert !ENV syntax back to EnvStr objects for proper YAML serialization
config_data_for_save = self._restore_env_objects(request.data)
# Debug: Log what we have after restoration
# Debug: Log what we have after restoration (secrets will be automatically redacted)
logger.trace(f"[DEBUG] Data after _restore_env_objects: {json.dumps(config_data_for_save, indent=2, default=str)}")
# Write updated YAML file
@ -829,7 +837,8 @@ class WebAPI:
async def delete_config(self, filename: str) -> dict:
"""Delete a configuration file."""
try:
config_file_path = self.config_path / filename
# Validate filename to prevent path traversal
config_file_path = self._validate_config_filename(filename)
if not config_file_path.exists():
raise HTTPException(status_code=404, detail=f"Configuration file '{filename}' not found")
@ -1370,6 +1379,46 @@ class WebAPI:
elif isinstance(item, (dict, list)):
self._log_env_str_values(item, current_path)
def _validate_config_filename(self, filename: str) -> Path:
"""Validate filename and return safe path to prevent path traversal attacks."""
# Reject empty or None filenames
if not filename or not isinstance(filename, str):
raise HTTPException(status_code=400, detail="Invalid filename")
# Reject filenames with path separators or starting with dot
if "/" in filename or "\\" in filename or filename.startswith("."):
raise HTTPException(status_code=400, detail="Invalid filename: path traversal detected")
# Enforce filename pattern (alphanumeric, underscore, hyphen, dot only)
if not re.match(r"^[A-Za-z0-9_.-]{1,64}$", filename):
raise HTTPException(status_code=400, detail="Invalid filename: contains invalid characters")
# Resolve the path and ensure it stays within config directory
config_file_path = self.config_path / filename
try:
# Use resolve() to get absolute path and check if it's within config_path
resolved_path = config_file_path.resolve()
if not resolved_path.is_relative_to(self.config_path):
raise HTTPException(status_code=403, detail="Access denied: path outside config directory")
except (OSError, ValueError):
raise HTTPException(status_code=400, detail="Invalid filename")
return config_file_path
def _register_sensitive_fields_as_secrets(self, data):
"""Register sensitive fields as secrets for automatic redaction in logs."""
if isinstance(data, dict):
for key, value in data.items():
if key.lower() in ["password", "password_hash", "api_key", "secret", "token", "key"]:
if isinstance(value, str) and value:
logger.secret(value)
else:
self._register_sensitive_fields_as_secrets(value)
elif isinstance(data, list):
for item in data:
self._register_sensitive_fields_as_secrets(item)
async def get_scheduler_status(self) -> dict:
"""Get complete scheduler status including schedule configuration and persistence information."""
try:
@ -1616,20 +1665,103 @@ class WebAPI:
settings_path = Path(self.default_dir) / "qbm_settings.yml"
settings = load_auth_settings(settings_path)
# Don't return the password hash for security
# Don't return sensitive information for security
settings.password_hash = "***" if settings.password_hash else ""
settings.api_key = "***" if settings.api_key else ""
return settings
except Exception as e:
logger.error(f"Error getting security settings: {str(e)}")
return AuthSettings()
async def update_security_settings(self, request: SecuritySettingsRequest) -> dict[str, Any]:
async def update_security_settings(self, request: SecuritySettingsRequest, req: Request) -> dict[str, Any]:
"""Update security settings."""
try:
settings_path = Path(self.default_dir) / "qbm_settings.yml"
current_settings = load_auth_settings(settings_path)
# Capture original values before any modifications for audit logging
original_settings = {
"enabled": current_settings.enabled,
"method": current_settings.method,
"bypass_auth_for_local": current_settings.bypass_auth_for_local,
"username": current_settings.username,
"api_key": current_settings.api_key,
}
# DEBUG: Log the request data to understand what's being sent
logger.trace(
f"Security settings update request: current_api_key={bool(request.current_api_key)}, "
f"current_username={bool(request.current_username)}, "
f"current_password={bool(request.current_password)}"
)
logger.trace(
f"Current settings: method={current_settings.method}, "
f"has_api_key={bool(current_settings.api_key)}, "
f"has_username={bool(current_settings.username)}"
)
# Verify current credentials for reauthentication
auth_verified = False
# First, try credentials provided in the request body
# Try API key verification first
if request.current_api_key and current_settings.api_key:
logger.trace("Attempting API key verification from request body")
if verify_api_key(request.current_api_key, current_settings.api_key):
auth_verified = True
logger.trace("API key verification successful")
else:
logger.trace("API key verification failed")
return {"success": False, "message": "Invalid current API key"}
# If API key not provided or invalid, try username/password
if not auth_verified and request.current_username and request.current_password:
logger.trace("Attempting username/password verification from request body")
if authenticate_user(request.current_username, request.current_password, current_settings):
auth_verified = True
logger.trace("Username/password verification successful")
else:
logger.trace("Username/password verification failed")
return {"success": False, "message": "Invalid current username or password"}
# If no credentials in request body, try to extract from request headers
if not auth_verified:
logger.trace("No credentials in request body, attempting to extract from headers")
# Try API key from header
api_key_header = req.headers.get("X-API-Key")
if api_key_header and current_settings.api_key:
logger.trace("Attempting API key verification from X-API-Key header")
if verify_api_key(api_key_header, current_settings.api_key):
auth_verified = True
logger.trace("API key verification from header successful")
# If API key header didn't work, try Basic auth from Authorization header
if not auth_verified:
auth_header = req.headers.get("Authorization")
if auth_header and auth_header.startswith("Basic "):
logger.trace("Attempting Basic auth verification from Authorization header")
try:
import base64
encoded_credentials = auth_header.split(" ")[1]
decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8")
username, password = decoded_credentials.split(":", 1)
if authenticate_user(username, password, current_settings):
auth_verified = True
logger.trace("Basic auth verification from header successful")
else:
logger.trace("Basic auth verification from header failed")
except Exception as e:
logger.warning(f"Error parsing Basic auth header: {e}")
# If neither method worked, require authentication
if not auth_verified:
logger.warning("No valid current credentials provided for security settings update")
return {"success": False, "message": "Current credentials required to update security settings"}
# Update settings
current_settings.enabled = request.enabled
current_settings.method = request.method
@ -1652,6 +1784,30 @@ class WebAPI:
if save_auth_settings(settings_path, current_settings):
# Force reload authentication settings to ensure immediate effect
AuthenticationMiddleware.force_reload_all_settings()
# Audit log the security changes by comparing original with updated values
changes = []
if original_settings["enabled"] != current_settings.enabled:
changes.append(f"enabled: {original_settings['enabled']} -> {current_settings.enabled}")
if original_settings["method"] != current_settings.method:
changes.append(f"method: {original_settings['method']} -> {current_settings.method}")
if original_settings["bypass_auth_for_local"] != current_settings.bypass_auth_for_local:
changes.append(
f"bypass_auth_for_local: {original_settings['bypass_auth_for_local']} -> "
f"{current_settings.bypass_auth_for_local}"
)
if original_settings["username"] != current_settings.username:
changes.append(f"username: {original_settings['username']} -> {current_settings.username}")
if request.password:
changes.append("password: [CHANGED]")
if request.generate_api_key:
changes.append("api_key: [GENERATED]")
if request.clear_api_key:
changes.append("api_key: [CLEARED]")
if changes:
logger.info(f"Security settings updated: {', '.join(changes)}")
logger.info("Authentication settings reloaded after update")
return {
"success": True,