Coverage for src / marimushka / audit.py: 100%

38 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-28 17:41 +0000

1"""Audit logging for security-relevant events. 

2 

3This module provides structured audit logging for security-critical operations 

4such as file access, export operations, and configuration changes. 

5""" 

6 

7import json 

8from datetime import UTC, datetime 

9from pathlib import Path 

10from typing import Any 

11 

12from loguru import logger 

13 

14 

15class AuditLogger: 

16 """Audit logger for security-relevant events. 

17 

18 This class provides structured logging for security events with 

19 consistent formatting and optional file output. 

20 

21 Attributes: 

22 enabled: Whether audit logging is enabled. 

23 log_file: Optional path to audit log file. 

24 

25 """ 

26 

27 def __init__(self, enabled: bool = True, log_file: Path | None = None) -> None: 

28 """Initialize the audit logger. 

29 

30 Args: 

31 enabled: Whether to enable audit logging. Defaults to True. 

32 log_file: Optional path to write audit logs to file. 

33 

34 """ 

35 self.enabled = enabled 

36 self.log_file = log_file 

37 

38 if self.log_file: 

39 # Ensure audit log directory exists 

40 self.log_file.parent.mkdir(parents=True, exist_ok=True) 

41 

42 def _log_event(self, event_type: str, details: dict[str, Any]) -> None: 

43 """Log an audit event. 

44 

45 Args: 

46 event_type: Type of event (e.g., 'path_validation', 'export'). 

47 details: Dictionary containing event details. 

48 

49 """ 

50 if not self.enabled: 

51 return 

52 

53 audit_entry = { 

54 "timestamp": datetime.now(UTC).isoformat(), 

55 "event_type": event_type, 

56 **details, 

57 } 

58 

59 # Log to structured logger 

60 logger.info(f"[AUDIT] {event_type}", extra={"audit": audit_entry}) 

61 

62 # Optionally write to file 

63 if self.log_file: 

64 try: 

65 with self.log_file.open("a") as f: 

66 f.write(json.dumps(audit_entry) + "\n") 

67 except OSError as e: # pragma: no cover 

68 logger.error(f"Failed to write audit log: {e}") 

69 

70 def log_path_validation(self, path: Path, validation_type: str, success: bool, reason: str | None = None) -> None: 

71 """Log a path validation event. 

72 

73 Args: 

74 path: The path that was validated. 

75 validation_type: Type of validation (e.g., 'traversal', 'bin_path'). 

76 success: Whether validation succeeded. 

77 reason: Optional reason for failure. 

78 

79 """ 

80 self._log_event( 

81 "path_validation", 

82 { 

83 "path": str(path), 

84 "validation_type": validation_type, 

85 "success": success, 

86 "reason": reason, 

87 }, 

88 ) 

89 

90 def log_export( 

91 self, notebook_path: Path, output_path: Path | None, success: bool, error: str | None = None 

92 ) -> None: 

93 """Log a notebook export event. 

94 

95 Args: 

96 notebook_path: Path to the notebook being exported. 

97 output_path: Path to the output file (if successful). 

98 success: Whether export succeeded. 

99 error: Optional error message. 

100 

101 """ 

102 self._log_event( 

103 "export", 

104 { 

105 "notebook_path": str(notebook_path), 

106 "output_path": str(output_path) if output_path else None, 

107 "success": success, 

108 "error": error, 

109 }, 

110 ) 

111 

112 def log_template_render(self, template_path: Path, success: bool, error: str | None = None) -> None: 

113 """Log a template rendering event. 

114 

115 Args: 

116 template_path: Path to the template being rendered. 

117 success: Whether rendering succeeded. 

118 error: Optional error message. 

119 

120 """ 

121 self._log_event( 

122 "template_render", 

123 { 

124 "template_path": str(template_path), 

125 "success": success, 

126 "error": error, 

127 }, 

128 ) 

129 

130 def log_config_load(self, config_path: Path | None, success: bool, error: str | None = None) -> None: 

131 """Log a configuration load event. 

132 

133 Args: 

134 config_path: Path to config file (or None if using defaults). 

135 success: Whether loading succeeded. 

136 error: Optional error message. 

137 

138 """ 

139 self._log_event( 

140 "config_load", 

141 { 

142 "config_path": str(config_path) if config_path else None, 

143 "success": success, 

144 "error": error, 

145 }, 

146 ) 

147 

148 def log_file_access(self, file_path: Path, operation: str, success: bool, error: str | None = None) -> None: 

149 """Log a file access event. 

150 

151 Args: 

152 file_path: Path to the file being accessed. 

153 operation: Type of operation (e.g., 'read', 'write'). 

154 success: Whether operation succeeded. 

155 error: Optional error message. 

156 

157 """ 

158 self._log_event( 

159 "file_access", 

160 { 

161 "file_path": str(file_path), 

162 "operation": operation, 

163 "success": success, 

164 "error": error, 

165 }, 

166 ) 

167 

168 

169# Global audit logger instance 

170_audit_logger: AuditLogger | None = None 

171 

172 

173def get_audit_logger() -> AuditLogger: 

174 """Get the global audit logger instance. 

175 

176 Returns: 

177 The global AuditLogger instance. 

178 

179 """ 

180 global _audit_logger 

181 if _audit_logger is None: 

182 _audit_logger = AuditLogger() 

183 return _audit_logger 

184 

185 

186def init_audit_logger(enabled: bool = True, log_file: Path | None = None) -> AuditLogger: 

187 """Initialize the global audit logger. 

188 

189 Args: 

190 enabled: Whether to enable audit logging. 

191 log_file: Optional path to audit log file. 

192 

193 Returns: 

194 The initialized AuditLogger instance. 

195 

196 """ 

197 global _audit_logger 

198 _audit_logger = AuditLogger(enabled=enabled, log_file=log_file) 

199 return _audit_logger