Coverage for src / marimushka / security.py: 100%
98 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-28 17:41 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-28 17:41 +0000
1"""Security utilities for marimushka.
3This module provides security-related utilities including path validation,
4path traversal protection, and other security measures.
5"""
7import os
8from pathlib import Path
11def validate_path_traversal(path: Path, base_dir: Path | None = None) -> Path:
12 """Validate that a path does not escape the base directory via path traversal.
14 Args:
15 path: The path to validate.
16 base_dir: The base directory that path should not escape from.
17 If None, only checks for relative path components that go up.
19 Returns:
20 The resolved absolute path if validation passes.
22 Raises:
23 ValueError: If the path contains path traversal attempts or escapes base_dir.
25 Examples:
26 >>> from pathlib import Path
27 >>> validate_path_traversal(Path("notebooks/test.py"), Path("/home/user")) # doctest: +SKIP
28 PosixPath('/home/user/notebooks/test.py')
29 >>> validate_path_traversal(Path("../../../etc/passwd")) # doctest: +SKIP
30 Traceback (most recent call last):
31 ...
32 ValueError: Path traversal detected: path cannot escape base directory
34 """
35 # Resolve the path to get its absolute form
36 try:
37 resolved_path = path.resolve(strict=False)
38 except (OSError, RuntimeError) as e:
39 raise ValueError(f"Invalid path: {path}") from e # noqa: TRY003
41 # If a base directory is provided, ensure the path doesn't escape it
42 if base_dir is not None:
43 try:
44 base_resolved = base_dir.resolve(strict=False)
45 except (OSError, RuntimeError) as e:
46 raise ValueError(f"Invalid base directory: {base_dir}") from e # noqa: TRY003
48 # Check if the resolved path is within the base directory
49 try:
50 resolved_path.relative_to(base_resolved)
51 except ValueError as e:
52 raise ValueError(f"Path traversal detected: {path} escapes base directory {base_dir}") from e # noqa: TRY003
54 return resolved_path
57def _check_whitelist(resolved_path: Path, whitelist: list[Path], original_path: Path) -> None:
58 """Check if a path is in the whitelist.
60 Args:
61 resolved_path: The resolved absolute path to check.
62 whitelist: List of allowed paths.
63 original_path: Original path for error message.
65 Raises:
66 ValueError: If path is not in whitelist.
68 """
69 resolved_whitelist = [p.resolve(strict=False) for p in whitelist]
70 if resolved_path not in resolved_whitelist:
71 raise ValueError(f"Binary path not in whitelist: {original_path}") # noqa: TRY003
74def validate_bin_path(bin_path: Path, whitelist: list[Path] | None = None) -> Path:
75 """Validate that bin_path is a valid directory and optionally check against whitelist.
77 Args:
78 bin_path: Path to the binary directory.
79 whitelist: Optional list of allowed bin paths. If None, accepts any valid directory.
81 Returns:
82 The validated Path object.
84 Raises:
85 ValueError: If bin_path is invalid or not in the whitelist.
87 Examples:
88 >>> validate_bin_path(Path("/usr/local/bin")) # doctest: +SKIP
89 PosixPath('/usr/local/bin')
91 """
92 if not bin_path.exists():
93 raise ValueError(f"Binary path does not exist: {bin_path}") # noqa: TRY003
95 if not bin_path.is_dir():
96 raise ValueError(f"Binary path is not a directory: {bin_path}") # noqa: TRY003
98 # Resolve to absolute path to prevent path traversal
99 resolved_bin_path = bin_path.resolve(strict=True)
101 # Check against whitelist if provided
102 if whitelist is not None:
103 _check_whitelist(resolved_bin_path, whitelist, bin_path)
105 return resolved_bin_path
108def validate_file_path(file_path: Path, allowed_extensions: list[str] | None = None) -> Path:
109 """Validate that a file path exists, is a file, and has an allowed extension.
111 Args:
112 file_path: Path to the file.
113 allowed_extensions: Optional list of allowed file extensions (e.g., ['.py', '.html']).
114 If None, accepts any extension.
116 Returns:
117 The validated Path object.
119 Raises:
120 ValueError: If file path is invalid or has a disallowed extension.
122 Examples:
123 >>> validate_file_path(Path("test.py"), [".py"]) # doctest: +SKIP
124 PosixPath('test.py')
126 """
127 if not file_path.exists():
128 raise ValueError(f"File does not exist: {file_path}") # noqa: TRY003
130 if not file_path.is_file():
131 raise ValueError(f"Path is not a file: {file_path}") # noqa: TRY003
133 # Check extension if whitelist provided
134 if allowed_extensions is not None and file_path.suffix not in allowed_extensions:
135 msg = f"File extension {file_path.suffix} not allowed. Allowed extensions: {', '.join(allowed_extensions)}"
136 raise ValueError(msg)
138 return file_path
141def sanitize_error_message(error_msg: str, sensitive_patterns: list[str] | None = None) -> str:
142 """Sanitize error messages by removing potentially sensitive information.
144 Args:
145 error_msg: The error message to sanitize.
146 sensitive_patterns: Optional list of patterns to redact from the message.
147 If None, uses default patterns (absolute paths, user info).
149 Returns:
150 The sanitized error message.
152 Examples:
153 >>> sanitize_error_message("Error in /home/user/secret/file.py")
154 'Error in <redacted_path>/file.py'
156 """
157 if sensitive_patterns is None:
158 # Default: redact absolute paths while keeping filename
159 sensitive_patterns = []
161 sanitized = error_msg
163 # Redact absolute paths but keep the filename
164 import re
166 # Pattern to match absolute paths (Unix and Windows)
167 path_pattern = r"(?:(?:[A-Za-z]:)?[/\\](?:[^/\\:\n]+[/\\])+)([^/\\:\n]+)"
169 def redact_path(match: re.Match[str]) -> str:
170 filename = match.group(1)
171 return f"<redacted_path>/{filename}"
173 sanitized = re.sub(path_pattern, redact_path, sanitized)
175 # Redact custom patterns
176 for pattern in sensitive_patterns:
177 sanitized = sanitized.replace(pattern, "<redacted>")
179 return sanitized
182def validate_max_workers(max_workers: int, min_workers: int = 1, max_allowed: int = 16) -> int:
183 """Validate and bound the number of worker threads.
185 Args:
186 max_workers: The requested number of workers.
187 min_workers: Minimum allowed workers. Defaults to 1.
188 max_allowed: Maximum allowed workers. Defaults to 16.
190 Returns:
191 The validated worker count, bounded to the allowed range.
193 Raises:
194 ValueError: If max_workers is not a positive integer or constraints are invalid.
196 Examples:
197 >>> validate_max_workers(4)
198 4
199 >>> validate_max_workers(100)
200 16
201 >>> validate_max_workers(0)
202 1
204 """
205 if not isinstance(max_workers, int):
206 raise TypeError(f"max_workers must be an integer, got {type(max_workers).__name__}") # noqa: TRY003
208 if min_workers < 1:
209 raise ValueError(f"min_workers must be at least 1, got {min_workers}") # noqa: TRY003
211 if max_allowed < min_workers:
212 raise ValueError(f"max_allowed ({max_allowed}) must be >= min_workers ({min_workers})") # noqa: TRY003
214 # Bound the value using max/min for cleaner logic
215 return max(min_workers, min(max_workers, max_allowed))
218def validate_file_size(file_path: Path, max_size_bytes: int = 10 * 1024 * 1024) -> bool:
219 """Validate that a file's size is within acceptable limits.
221 This helps prevent DoS attacks via extremely large files.
223 Args:
224 file_path: Path to the file to check.
225 max_size_bytes: Maximum allowed file size in bytes. Defaults to 10MB.
227 Returns:
228 True if file size is acceptable.
230 Raises:
231 ValueError: If file size exceeds the limit or file doesn't exist.
233 Examples:
234 >>> from pathlib import Path
235 >>> # File within limit
236 >>> validate_file_size(Path("small.txt"), max_size_bytes=1024*1024) # doctest: +SKIP
237 True
239 """
240 if not file_path.exists():
241 raise ValueError(f"File does not exist: {file_path}") # noqa: TRY003
243 try:
244 file_size = file_path.stat().st_size
245 except OSError as e:
246 raise ValueError(f"Cannot read file size: {file_path}") from e # noqa: TRY003
248 if file_size > max_size_bytes:
249 max_mb = max_size_bytes / (1024 * 1024)
250 actual_mb = file_size / (1024 * 1024)
251 msg = f"File size {actual_mb:.2f}MB exceeds limit of {max_mb:.2f}MB: {file_path}"
252 raise ValueError(msg)
254 return True
257def safe_open_file(file_path: Path, mode: str = "r") -> int:
258 """Safely open a file and return a file descriptor to avoid TOCTOU races.
260 This function uses os.open with O_NOFOLLOW to prevent symlink attacks
261 and returns a file descriptor that can be used with Path.open() via os.fdopen.
263 Args:
264 file_path: Path to the file to open.
265 mode: File open mode ('r' for read, 'w' for write, etc.).
267 Returns:
268 File descriptor that should be used with os.fdopen.
270 Raises:
271 ValueError: If the path is invalid or a symlink.
272 OSError: If the file cannot be opened.
274 Examples:
275 >>> from pathlib import Path
276 >>> import os
277 >>> # Safe file open
278 >>> fd = safe_open_file(Path("test.txt"), "w") # doctest: +SKIP
279 >>> with os.fdopen(fd, "w") as f: # doctest: +SKIP
280 ... f.write("safe content")
282 """
283 # Validate path first
284 if not file_path.exists() and "w" not in mode and "a" not in mode:
285 raise ValueError(f"File does not exist: {file_path}") # noqa: TRY003
287 # Check if it's a symlink (prevent symlink attacks)
288 if file_path.exists() and file_path.is_symlink():
289 raise ValueError(f"Cannot open symlink: {file_path}") # noqa: TRY003
291 # Open file with O_NOFOLLOW to prevent TOCTOU symlink race
292 flags = os.O_NOFOLLOW
293 if mode == "r":
294 flags |= os.O_RDONLY
295 elif mode == "w":
296 flags |= os.O_WRONLY | os.O_CREAT | os.O_TRUNC
297 elif mode == "a":
298 flags |= os.O_WRONLY | os.O_CREAT | os.O_APPEND
299 else:
300 raise ValueError(f"Unsupported mode: {mode}") # noqa: TRY003
302 # Open with restricted permissions (owner read/write only)
303 try:
304 fd = os.open(file_path, flags, mode=0o600)
305 except OSError as e:
306 raise ValueError(f"Cannot open file: {file_path}") from e # noqa: TRY003
307 else:
308 return fd
311def set_secure_file_permissions(file_path: Path, mode: int = 0o644) -> None:
312 """Set secure permissions on a file.
314 Args:
315 file_path: Path to the file.
316 mode: Permission mode (default: 0o644 = rw-r--r--).
318 Raises:
319 ValueError: If file doesn't exist or permissions cannot be set.
321 Examples:
322 >>> from pathlib import Path
323 >>> set_secure_file_permissions(Path("test.txt"), 0o600) # doctest: +SKIP
325 """
326 if not file_path.exists():
327 raise ValueError(f"File does not exist: {file_path}") # noqa: TRY003
329 try:
330 os.chmod(file_path, mode)
331 except OSError as e:
332 raise ValueError(f"Cannot set permissions on {file_path}") from e # noqa: TRY003