Coverage for src / marimushka / security.py: 100%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-28 17:41 +0000

1"""Security utilities for marimushka. 

2 

3This module provides security-related utilities including path validation, 

4path traversal protection, and other security measures. 

5""" 

6 

7import os 

8from pathlib import Path 

9 

10 

11def validate_path_traversal(path: Path, base_dir: Path | None = None) -> Path: 

12 """Validate that a path does not escape the base directory via path traversal. 

13 

14 Args: 

15 path: The path to validate. 

16 base_dir: The base directory that path should not escape from. 

17 If None, only checks for relative path components that go up. 

18 

19 Returns: 

20 The resolved absolute path if validation passes. 

21 

22 Raises: 

23 ValueError: If the path contains path traversal attempts or escapes base_dir. 

24 

25 Examples: 

26 >>> from pathlib import Path 

27 >>> validate_path_traversal(Path("notebooks/test.py"), Path("/home/user")) # doctest: +SKIP 

28 PosixPath('/home/user/notebooks/test.py') 

29 >>> validate_path_traversal(Path("../../../etc/passwd")) # doctest: +SKIP 

30 Traceback (most recent call last): 

31 ... 

32 ValueError: Path traversal detected: path cannot escape base directory 

33 

34 """ 

35 # Resolve the path to get its absolute form 

36 try: 

37 resolved_path = path.resolve(strict=False) 

38 except (OSError, RuntimeError) as e: 

39 raise ValueError(f"Invalid path: {path}") from e # noqa: TRY003 

40 

41 # If a base directory is provided, ensure the path doesn't escape it 

42 if base_dir is not None: 

43 try: 

44 base_resolved = base_dir.resolve(strict=False) 

45 except (OSError, RuntimeError) as e: 

46 raise ValueError(f"Invalid base directory: {base_dir}") from e # noqa: TRY003 

47 

48 # Check if the resolved path is within the base directory 

49 try: 

50 resolved_path.relative_to(base_resolved) 

51 except ValueError as e: 

52 raise ValueError(f"Path traversal detected: {path} escapes base directory {base_dir}") from e # noqa: TRY003 

53 

54 return resolved_path 

55 

56 

57def _check_whitelist(resolved_path: Path, whitelist: list[Path], original_path: Path) -> None: 

58 """Check if a path is in the whitelist. 

59 

60 Args: 

61 resolved_path: The resolved absolute path to check. 

62 whitelist: List of allowed paths. 

63 original_path: Original path for error message. 

64 

65 Raises: 

66 ValueError: If path is not in whitelist. 

67 

68 """ 

69 resolved_whitelist = [p.resolve(strict=False) for p in whitelist] 

70 if resolved_path not in resolved_whitelist: 

71 raise ValueError(f"Binary path not in whitelist: {original_path}") # noqa: TRY003 

72 

73 

74def validate_bin_path(bin_path: Path, whitelist: list[Path] | None = None) -> Path: 

75 """Validate that bin_path is a valid directory and optionally check against whitelist. 

76 

77 Args: 

78 bin_path: Path to the binary directory. 

79 whitelist: Optional list of allowed bin paths. If None, accepts any valid directory. 

80 

81 Returns: 

82 The validated Path object. 

83 

84 Raises: 

85 ValueError: If bin_path is invalid or not in the whitelist. 

86 

87 Examples: 

88 >>> validate_bin_path(Path("/usr/local/bin")) # doctest: +SKIP 

89 PosixPath('/usr/local/bin') 

90 

91 """ 

92 if not bin_path.exists(): 

93 raise ValueError(f"Binary path does not exist: {bin_path}") # noqa: TRY003 

94 

95 if not bin_path.is_dir(): 

96 raise ValueError(f"Binary path is not a directory: {bin_path}") # noqa: TRY003 

97 

98 # Resolve to absolute path to prevent path traversal 

99 resolved_bin_path = bin_path.resolve(strict=True) 

100 

101 # Check against whitelist if provided 

102 if whitelist is not None: 

103 _check_whitelist(resolved_bin_path, whitelist, bin_path) 

104 

105 return resolved_bin_path 

106 

107 

108def validate_file_path(file_path: Path, allowed_extensions: list[str] | None = None) -> Path: 

109 """Validate that a file path exists, is a file, and has an allowed extension. 

110 

111 Args: 

112 file_path: Path to the file. 

113 allowed_extensions: Optional list of allowed file extensions (e.g., ['.py', '.html']). 

114 If None, accepts any extension. 

115 

116 Returns: 

117 The validated Path object. 

118 

119 Raises: 

120 ValueError: If file path is invalid or has a disallowed extension. 

121 

122 Examples: 

123 >>> validate_file_path(Path("test.py"), [".py"]) # doctest: +SKIP 

124 PosixPath('test.py') 

125 

126 """ 

127 if not file_path.exists(): 

128 raise ValueError(f"File does not exist: {file_path}") # noqa: TRY003 

129 

130 if not file_path.is_file(): 

131 raise ValueError(f"Path is not a file: {file_path}") # noqa: TRY003 

132 

133 # Check extension if whitelist provided 

134 if allowed_extensions is not None and file_path.suffix not in allowed_extensions: 

135 msg = f"File extension {file_path.suffix} not allowed. Allowed extensions: {', '.join(allowed_extensions)}" 

136 raise ValueError(msg) 

137 

138 return file_path 

139 

140 

141def sanitize_error_message(error_msg: str, sensitive_patterns: list[str] | None = None) -> str: 

142 """Sanitize error messages by removing potentially sensitive information. 

143 

144 Args: 

145 error_msg: The error message to sanitize. 

146 sensitive_patterns: Optional list of patterns to redact from the message. 

147 If None, uses default patterns (absolute paths, user info). 

148 

149 Returns: 

150 The sanitized error message. 

151 

152 Examples: 

153 >>> sanitize_error_message("Error in /home/user/secret/file.py") 

154 'Error in <redacted_path>/file.py' 

155 

156 """ 

157 if sensitive_patterns is None: 

158 # Default: redact absolute paths while keeping filename 

159 sensitive_patterns = [] 

160 

161 sanitized = error_msg 

162 

163 # Redact absolute paths but keep the filename 

164 import re 

165 

166 # Pattern to match absolute paths (Unix and Windows) 

167 path_pattern = r"(?:(?:[A-Za-z]:)?[/\\](?:[^/\\:\n]+[/\\])+)([^/\\:\n]+)" 

168 

169 def redact_path(match: re.Match[str]) -> str: 

170 filename = match.group(1) 

171 return f"<redacted_path>/{filename}" 

172 

173 sanitized = re.sub(path_pattern, redact_path, sanitized) 

174 

175 # Redact custom patterns 

176 for pattern in sensitive_patterns: 

177 sanitized = sanitized.replace(pattern, "<redacted>") 

178 

179 return sanitized 

180 

181 

182def validate_max_workers(max_workers: int, min_workers: int = 1, max_allowed: int = 16) -> int: 

183 """Validate and bound the number of worker threads. 

184 

185 Args: 

186 max_workers: The requested number of workers. 

187 min_workers: Minimum allowed workers. Defaults to 1. 

188 max_allowed: Maximum allowed workers. Defaults to 16. 

189 

190 Returns: 

191 The validated worker count, bounded to the allowed range. 

192 

193 Raises: 

194 ValueError: If max_workers is not a positive integer or constraints are invalid. 

195 

196 Examples: 

197 >>> validate_max_workers(4) 

198 4 

199 >>> validate_max_workers(100) 

200 16 

201 >>> validate_max_workers(0) 

202 1 

203 

204 """ 

205 if not isinstance(max_workers, int): 

206 raise TypeError(f"max_workers must be an integer, got {type(max_workers).__name__}") # noqa: TRY003 

207 

208 if min_workers < 1: 

209 raise ValueError(f"min_workers must be at least 1, got {min_workers}") # noqa: TRY003 

210 

211 if max_allowed < min_workers: 

212 raise ValueError(f"max_allowed ({max_allowed}) must be >= min_workers ({min_workers})") # noqa: TRY003 

213 

214 # Bound the value using max/min for cleaner logic 

215 return max(min_workers, min(max_workers, max_allowed)) 

216 

217 

218def validate_file_size(file_path: Path, max_size_bytes: int = 10 * 1024 * 1024) -> bool: 

219 """Validate that a file's size is within acceptable limits. 

220 

221 This helps prevent DoS attacks via extremely large files. 

222 

223 Args: 

224 file_path: Path to the file to check. 

225 max_size_bytes: Maximum allowed file size in bytes. Defaults to 10MB. 

226 

227 Returns: 

228 True if file size is acceptable. 

229 

230 Raises: 

231 ValueError: If file size exceeds the limit or file doesn't exist. 

232 

233 Examples: 

234 >>> from pathlib import Path 

235 >>> # File within limit 

236 >>> validate_file_size(Path("small.txt"), max_size_bytes=1024*1024) # doctest: +SKIP 

237 True 

238 

239 """ 

240 if not file_path.exists(): 

241 raise ValueError(f"File does not exist: {file_path}") # noqa: TRY003 

242 

243 try: 

244 file_size = file_path.stat().st_size 

245 except OSError as e: 

246 raise ValueError(f"Cannot read file size: {file_path}") from e # noqa: TRY003 

247 

248 if file_size > max_size_bytes: 

249 max_mb = max_size_bytes / (1024 * 1024) 

250 actual_mb = file_size / (1024 * 1024) 

251 msg = f"File size {actual_mb:.2f}MB exceeds limit of {max_mb:.2f}MB: {file_path}" 

252 raise ValueError(msg) 

253 

254 return True 

255 

256 

257def safe_open_file(file_path: Path, mode: str = "r") -> int: 

258 """Safely open a file and return a file descriptor to avoid TOCTOU races. 

259 

260 This function uses os.open with O_NOFOLLOW to prevent symlink attacks 

261 and returns a file descriptor that can be used with Path.open() via os.fdopen. 

262 

263 Args: 

264 file_path: Path to the file to open. 

265 mode: File open mode ('r' for read, 'w' for write, etc.). 

266 

267 Returns: 

268 File descriptor that should be used with os.fdopen. 

269 

270 Raises: 

271 ValueError: If the path is invalid or a symlink. 

272 OSError: If the file cannot be opened. 

273 

274 Examples: 

275 >>> from pathlib import Path 

276 >>> import os 

277 >>> # Safe file open 

278 >>> fd = safe_open_file(Path("test.txt"), "w") # doctest: +SKIP 

279 >>> with os.fdopen(fd, "w") as f: # doctest: +SKIP 

280 ... f.write("safe content") 

281 

282 """ 

283 # Validate path first 

284 if not file_path.exists() and "w" not in mode and "a" not in mode: 

285 raise ValueError(f"File does not exist: {file_path}") # noqa: TRY003 

286 

287 # Check if it's a symlink (prevent symlink attacks) 

288 if file_path.exists() and file_path.is_symlink(): 

289 raise ValueError(f"Cannot open symlink: {file_path}") # noqa: TRY003 

290 

291 # Open file with O_NOFOLLOW to prevent TOCTOU symlink race 

292 flags = os.O_NOFOLLOW 

293 if mode == "r": 

294 flags |= os.O_RDONLY 

295 elif mode == "w": 

296 flags |= os.O_WRONLY | os.O_CREAT | os.O_TRUNC 

297 elif mode == "a": 

298 flags |= os.O_WRONLY | os.O_CREAT | os.O_APPEND 

299 else: 

300 raise ValueError(f"Unsupported mode: {mode}") # noqa: TRY003 

301 

302 # Open with restricted permissions (owner read/write only) 

303 try: 

304 fd = os.open(file_path, flags, mode=0o600) 

305 except OSError as e: 

306 raise ValueError(f"Cannot open file: {file_path}") from e # noqa: TRY003 

307 else: 

308 return fd 

309 

310 

311def set_secure_file_permissions(file_path: Path, mode: int = 0o644) -> None: 

312 """Set secure permissions on a file. 

313 

314 Args: 

315 file_path: Path to the file. 

316 mode: Permission mode (default: 0o644 = rw-r--r--). 

317 

318 Raises: 

319 ValueError: If file doesn't exist or permissions cannot be set. 

320 

321 Examples: 

322 >>> from pathlib import Path 

323 >>> set_secure_file_permissions(Path("test.txt"), 0o600) # doctest: +SKIP 

324 

325 """ 

326 if not file_path.exists(): 

327 raise ValueError(f"File does not exist: {file_path}") # noqa: TRY003 

328 

329 try: 

330 os.chmod(file_path, mode) 

331 except OSError as e: 

332 raise ValueError(f"Cannot set permissions on {file_path}") from e # noqa: TRY003