From d280787bd16b8100c41d19894e20ae98fe860416 Mon Sep 17 00:00:00 2001 From: bobokun Date: Sun, 7 Sep 2025 13:58:44 -0400 Subject: [PATCH] feat(auth): harden security across auth and API - Require reauthentication to update security settings via API (API key or username/password; accepted in body or headers) - Add current_username/current_password/current_api_key to request model for secure updates - Mitigate timing attacks in Basic auth by verifying password using a dummy hash when username mismatches; improve failure logging - Enforce restrictive permissions (0600) on qbm_settings.yml during load/save; warn and attempt automatic correction if permissive - Lock down CORS defaults: no origins allowed, credentials disabled, explicit methods/headers only - Prevent path traversal on config filenames via strict validation and resolve checks - Automatically redact secrets in logs by registering sensitive fields (passwords, tokens, keys) - Redact password_hash and api_key in security settings responses - Audit log security setting changes and reload middleware on save BREAKING CHANGE: CORS is now denied by default (no allowed origins, credentials disabled). Cross-origin clients must be explicitly allowed. Updating security settings now requires current credentials (API key or username/password). --- VERSION | 2 +- modules/auth.py | 54 +++++++++++-- modules/web_api.py | 184 +++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 218 insertions(+), 22 deletions(-) diff --git a/VERSION b/VERSION index d5942b7..232561b 100755 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.6.1-develop17 +4.6.1-develop18 diff --git a/modules/auth.py b/modules/auth.py index f4372af..d082e3f 100644 --- a/modules/auth.py +++ b/modules/auth.py @@ -2,6 +2,7 @@ import base64 import hashlib +import os import re import secrets from datetime import datetime @@ -75,6 +76,11 @@ class SecuritySettingsRequest(BaseModel): generate_api_key: bool = False clear_api_key: bool = False + # Current credentials for reauthentication + current_username: str = "" + current_password: str = "" + current_api_key: str = "" + @validator("username") def username_must_be_valid(cls, v): if v and (len(v) < 3 or len(v) > 50): @@ -243,6 +249,23 @@ class AuthenticationMiddleware(BaseHTTPMiddleware): try: if self.settings_path.exists(): + # Check file permissions for security + try: + stat_info = self.settings_path.stat() + # Check if file is readable/writable by group or others (should be 600 or similar) + if stat_info.st_mode & 0o077: # Check if group/other has any permissions + logger.warning( + f"Settings file {self.settings_path} has overly permissive permissions. " + f"Fixing to 600 (owner read/write only)." + ) + try: + os.chmod(str(self.settings_path), 0o600) + logger.info(f"Fixed permissions for settings file {self.settings_path} to 600.") + except OSError as e: + logger.error(f"Could not fix permissions for settings file: {e}") + except (OSError, AttributeError) as e: + logger.warning(f"Could not check permissions for settings file: {e}") + with open(self.settings_path, encoding="utf-8") as f: data = ruamel.yaml.YAML().load(f) or {} @@ -363,14 +386,25 @@ class AuthenticationMiddleware(BaseHTTPMiddleware): decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8") username, password = decoded_credentials.split(":", 1) - # Verify credentials - if username == settings.username and verify_password(password, settings.password_hash): - return await call_next(request) + # Verify credentials with constant-time comparison to prevent timing attacks + if username == settings.username: + # Username matches, verify password + if verify_password(password, settings.password_hash): + return await call_next(request) + else: + logger.debug("Basic auth: invalid password") else: - logger.debug("Basic auth: invalid credentials") - # Record failed authentication attempt - record_auth_attempt(request) - return self._auth_challenge_response() + # Username doesn't match, but still verify password against dummy hash for constant time + dummy_hash = ( + "$argon2id$v=19$m=65536,t=3,p=4$pSFTuqiWsyxxjLN0fuXtrw$" + "MvnRW8OTPn6ED7sun2PvdxqJuYDLvog+OpEmA+Y4eSQ" + ) # Dummy hash + verify_password(password, dummy_hash) # This will always fail but takes constant time + logger.debug("Basic auth: invalid username") + + # Record failed authentication attempt + record_auth_attempt(request) + return self._auth_challenge_response() except Exception: logger.debug("Basic auth: error processing credentials") @@ -449,6 +483,12 @@ def save_auth_settings(settings_path: Path, settings: AuthSettings) -> bool: with open(settings_path, "w", encoding="utf-8") as f: yaml.dump(data, f) + # Set restrictive permissions (600 - owner read/write only) + try: + os.chmod(str(settings_path), 0o600) + except OSError as e: + logger.error(f"Could not set permissions for settings file: {e}") + return True except Exception as e: logger.error(f"Error saving authentication settings: {e}") diff --git a/modules/web_api.py b/modules/web_api.py index a1ac953..821d2c4 100755 --- a/modules/web_api.py +++ b/modules/web_api.py @@ -44,10 +44,12 @@ from modules import util from modules.auth import AuthenticationMiddleware from modules.auth import AuthSettings from modules.auth import SecuritySettingsRequest +from modules.auth import authenticate_user from modules.auth import generate_api_key from modules.auth import hash_password from modules.auth import load_auth_settings from modules.auth import save_auth_settings +from modules.auth import verify_api_key from modules.config import Config from modules.scheduler import Scheduler from modules.util import YAML @@ -265,13 +267,13 @@ class WebAPI: object.__setattr__(self, "logs_path", Path(self.default_dir) / "logs") object.__setattr__(self, "backup_path", Path(self.default_dir) / ".backups") - # Configure CORS + # Configure CORS - restrict to prevent unauthorized cross-origin access self.app.add_middleware( CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], + allow_origins=[], # No cross-origin requests allowed by default + allow_credentials=False, # Disable credentials to prevent CSRF via CORS + allow_methods=["GET", "POST", "PUT", "DELETE"], # Explicit allowed methods + allow_headers=["Authorization", "Content-Type", "X-API-Key"], # Explicit allowed headers ) # Configure Rate Limiting @@ -740,11 +742,12 @@ class WebAPI: async def get_config(self, filename: str) -> ConfigResponse: """Get a specific configuration file.""" try: + # Validate filename to prevent path traversal + config_file_path = self._validate_config_filename(filename) + # Define sensitive configuration files that should not be accessible via API sensitive_files = {"qbm_settings.yml"} - config_file_path = self.config_path / filename - if not config_file_path.exists(): raise HTTPException(status_code=404, detail=f"Configuration file '{filename}' not found") @@ -778,7 +781,8 @@ class WebAPI: async def create_config(self, filename: str, request: ConfigRequest) -> dict: """Create a new configuration file.""" try: - config_file_path = self.config_path / filename + # Validate filename to prevent path traversal + config_file_path = self._validate_config_filename(filename) if config_file_path.exists(): raise HTTPException(status_code=409, detail=f"Configuration file '{filename}' already exists") @@ -799,7 +803,8 @@ class WebAPI: async def update_config(self, filename: str, request: ConfigRequest) -> dict: """Update an existing configuration file.""" try: - config_file_path = self.config_path / filename + # Validate filename to prevent path traversal + config_file_path = self._validate_config_filename(filename) if not config_file_path.exists(): raise HTTPException(status_code=404, detail=f"Configuration file '{filename}' not found") @@ -807,13 +812,16 @@ class WebAPI: # Create backup await self._create_backup(config_file_path) - # Debug: Log what we received from frontend + # Register sensitive fields as secrets for automatic redaction in logs + self._register_sensitive_fields_as_secrets(request.data) + + # Debug: Log what we received from frontend (secrets will be automatically redacted) logger.trace(f"[DEBUG] Raw data received from frontend: {json.dumps(request.data, indent=2, default=str)}") # Convert !ENV syntax back to EnvStr objects for proper YAML serialization config_data_for_save = self._restore_env_objects(request.data) - # Debug: Log what we have after restoration + # Debug: Log what we have after restoration (secrets will be automatically redacted) logger.trace(f"[DEBUG] Data after _restore_env_objects: {json.dumps(config_data_for_save, indent=2, default=str)}") # Write updated YAML file @@ -829,7 +837,8 @@ class WebAPI: async def delete_config(self, filename: str) -> dict: """Delete a configuration file.""" try: - config_file_path = self.config_path / filename + # Validate filename to prevent path traversal + config_file_path = self._validate_config_filename(filename) if not config_file_path.exists(): raise HTTPException(status_code=404, detail=f"Configuration file '{filename}' not found") @@ -1370,6 +1379,46 @@ class WebAPI: elif isinstance(item, (dict, list)): self._log_env_str_values(item, current_path) + def _validate_config_filename(self, filename: str) -> Path: + """Validate filename and return safe path to prevent path traversal attacks.""" + + # Reject empty or None filenames + if not filename or not isinstance(filename, str): + raise HTTPException(status_code=400, detail="Invalid filename") + + # Reject filenames with path separators or starting with dot + if "/" in filename or "\\" in filename or filename.startswith("."): + raise HTTPException(status_code=400, detail="Invalid filename: path traversal detected") + + # Enforce filename pattern (alphanumeric, underscore, hyphen, dot only) + if not re.match(r"^[A-Za-z0-9_.-]{1,64}$", filename): + raise HTTPException(status_code=400, detail="Invalid filename: contains invalid characters") + + # Resolve the path and ensure it stays within config directory + config_file_path = self.config_path / filename + try: + # Use resolve() to get absolute path and check if it's within config_path + resolved_path = config_file_path.resolve() + if not resolved_path.is_relative_to(self.config_path): + raise HTTPException(status_code=403, detail="Access denied: path outside config directory") + except (OSError, ValueError): + raise HTTPException(status_code=400, detail="Invalid filename") + + return config_file_path + + def _register_sensitive_fields_as_secrets(self, data): + """Register sensitive fields as secrets for automatic redaction in logs.""" + if isinstance(data, dict): + for key, value in data.items(): + if key.lower() in ["password", "password_hash", "api_key", "secret", "token", "key"]: + if isinstance(value, str) and value: + logger.secret(value) + else: + self._register_sensitive_fields_as_secrets(value) + elif isinstance(data, list): + for item in data: + self._register_sensitive_fields_as_secrets(item) + async def get_scheduler_status(self) -> dict: """Get complete scheduler status including schedule configuration and persistence information.""" try: @@ -1616,20 +1665,103 @@ class WebAPI: settings_path = Path(self.default_dir) / "qbm_settings.yml" settings = load_auth_settings(settings_path) - # Don't return the password hash for security + # Don't return sensitive information for security settings.password_hash = "***" if settings.password_hash else "" + settings.api_key = "***" if settings.api_key else "" return settings except Exception as e: logger.error(f"Error getting security settings: {str(e)}") return AuthSettings() - async def update_security_settings(self, request: SecuritySettingsRequest) -> dict[str, Any]: + async def update_security_settings(self, request: SecuritySettingsRequest, req: Request) -> dict[str, Any]: """Update security settings.""" try: settings_path = Path(self.default_dir) / "qbm_settings.yml" current_settings = load_auth_settings(settings_path) + # Capture original values before any modifications for audit logging + original_settings = { + "enabled": current_settings.enabled, + "method": current_settings.method, + "bypass_auth_for_local": current_settings.bypass_auth_for_local, + "username": current_settings.username, + "api_key": current_settings.api_key, + } + + # DEBUG: Log the request data to understand what's being sent + logger.trace( + f"Security settings update request: current_api_key={bool(request.current_api_key)}, " + f"current_username={bool(request.current_username)}, " + f"current_password={bool(request.current_password)}" + ) + logger.trace( + f"Current settings: method={current_settings.method}, " + f"has_api_key={bool(current_settings.api_key)}, " + f"has_username={bool(current_settings.username)}" + ) + + # Verify current credentials for reauthentication + auth_verified = False + + # First, try credentials provided in the request body + # Try API key verification first + if request.current_api_key and current_settings.api_key: + logger.trace("Attempting API key verification from request body") + if verify_api_key(request.current_api_key, current_settings.api_key): + auth_verified = True + logger.trace("API key verification successful") + else: + logger.trace("API key verification failed") + return {"success": False, "message": "Invalid current API key"} + + # If API key not provided or invalid, try username/password + if not auth_verified and request.current_username and request.current_password: + logger.trace("Attempting username/password verification from request body") + if authenticate_user(request.current_username, request.current_password, current_settings): + auth_verified = True + logger.trace("Username/password verification successful") + else: + logger.trace("Username/password verification failed") + return {"success": False, "message": "Invalid current username or password"} + + # If no credentials in request body, try to extract from request headers + if not auth_verified: + logger.trace("No credentials in request body, attempting to extract from headers") + + # Try API key from header + api_key_header = req.headers.get("X-API-Key") + if api_key_header and current_settings.api_key: + logger.trace("Attempting API key verification from X-API-Key header") + if verify_api_key(api_key_header, current_settings.api_key): + auth_verified = True + logger.trace("API key verification from header successful") + + # If API key header didn't work, try Basic auth from Authorization header + if not auth_verified: + auth_header = req.headers.get("Authorization") + if auth_header and auth_header.startswith("Basic "): + logger.trace("Attempting Basic auth verification from Authorization header") + try: + import base64 + + encoded_credentials = auth_header.split(" ")[1] + decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8") + username, password = decoded_credentials.split(":", 1) + + if authenticate_user(username, password, current_settings): + auth_verified = True + logger.trace("Basic auth verification from header successful") + else: + logger.trace("Basic auth verification from header failed") + except Exception as e: + logger.warning(f"Error parsing Basic auth header: {e}") + + # If neither method worked, require authentication + if not auth_verified: + logger.warning("No valid current credentials provided for security settings update") + return {"success": False, "message": "Current credentials required to update security settings"} + # Update settings current_settings.enabled = request.enabled current_settings.method = request.method @@ -1652,6 +1784,30 @@ class WebAPI: if save_auth_settings(settings_path, current_settings): # Force reload authentication settings to ensure immediate effect AuthenticationMiddleware.force_reload_all_settings() + + # Audit log the security changes by comparing original with updated values + changes = [] + if original_settings["enabled"] != current_settings.enabled: + changes.append(f"enabled: {original_settings['enabled']} -> {current_settings.enabled}") + if original_settings["method"] != current_settings.method: + changes.append(f"method: {original_settings['method']} -> {current_settings.method}") + if original_settings["bypass_auth_for_local"] != current_settings.bypass_auth_for_local: + changes.append( + f"bypass_auth_for_local: {original_settings['bypass_auth_for_local']} -> " + f"{current_settings.bypass_auth_for_local}" + ) + if original_settings["username"] != current_settings.username: + changes.append(f"username: {original_settings['username']} -> {current_settings.username}") + if request.password: + changes.append("password: [CHANGED]") + if request.generate_api_key: + changes.append("api_key: [GENERATED]") + if request.clear_api_key: + changes.append("api_key: [CLEARED]") + + if changes: + logger.info(f"Security settings updated: {', '.join(changes)}") + logger.info("Authentication settings reloaded after update") return { "success": True,