Coverage for stlog/formatter.py: 90%

208 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-08-21 07:31 +0000

1from __future__ import annotations 

2 

3import fnmatch 

4import json 

5import logging 

6import re 

7import time 

8from dataclasses import dataclass, field 

9from typing import Any, Callable, Sequence 

10 

11from stlog.base import ( 

12 GLOBAL_LOGGING_CONFIG, 

13 STLOG_EXTRA_KEY, 

14 format_string, 

15 logfmt_format_value, 

16 parse_format, 

17 rich_markup_escape, 

18) 

19from stlog.kvformatter import ( 

20 JsonKVFormatter, 

21 KVFormatter, 

22 LogFmtKVFormatter, 

23 _truncate_serialize, 

24 _truncate_str, 

25) 

26 

27DEFAULT_STLOG_HUMAN_FORMAT = "{asctime} {name} [{levelname:^10s}] {message}{extras}" 

28DEFAULT_STLOG_RICH_HUMAN_FORMAT = ":arrow_forward: [log.time]{asctime}[/log.time] {name} [{rich_level_style}]{levelname:^8s}[/{rich_level_style}] [bold]{rich_escaped_message}[/bold]{extras}" 

29DEFAULT_STLOG_LOGFMT_FORMAT = ( 

30 "time={asctime} logger={name} level={levelname} message={message}{extras}" 

31) 

32DEFAULT_STLOG_JSON_FORMAT = """ 

33{{ 

34 "time": {asctime}, 

35 "logger": {name}, 

36 "level": {levelname}, 

37 "message": {message}, 

38 "source": {{ 

39 "path": {pathname}, 

40 "lineno": {lineno}, 

41 "module": {module}, 

42 "funcName": {funcName}, 

43 "process": {process}, 

44 "processName": {processName}, 

45 "thread": {thread}, 

46 "threadName": {threadName} 

47 }} 

48}} 

49""" 

50DEFAULT_STLOG_GCP_JSON_FORMAT = """ 

51{{ 

52 "timestamp": {asctime}, 

53 "logger": {name}, 

54 "severity": {levelname}, 

55 "message": {message}, 

56 "source": {{ 

57 "path": {pathname}, 

58 "lineno": {lineno}, 

59 "module": {module}, 

60 "funcName": {funcName}, 

61 "process": {process}, 

62 "processName": {processName}, 

63 "thread": {thread}, 

64 "threadName": {threadName} 

65 }} 

66}} 

67""" 

68DEFAULT_STLOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" 

69 

70 

71def _unit_tests_converter(val: float | None) -> time.struct_time: 

72 # always the same value 

73 return time.gmtime(1680101317) 

74 

75 

76@dataclass 

77class Formatter(logging.Formatter): 

78 """Abstract base class for `stlog` formatters. 

79 

80 Attributes: 

81 fmt: the default format for the formatter. 

82 datefmt: the format to use for `{asctime}` placeholder. 

83 style: can be '%', '{' or '$' to select how the format string will be merged with its data 

84 (see https://docs.python.org/3/library/logging.html#logging.Formatter for details) 

85 include_extras_keys_fnmatchs: fnmatch patterns list for including keys in `{extras}` placeholder. 

86 exclude_extras_keys_fnmatchs: fnmatch patterns list for excluding keys in `{extras}` placeholder. 

87 extra_key_rename_fn: callable which takes a key name and return a renamed key to use 

88 (or None to ignore the key/value). 

89 extra_key_max_length: maximum size of extra keys to be included in `{extras}` placeholder 

90 (after this limit, the value will be truncated and ... will be added at the end, 0 means "no limit"). 

91 converter: time converter function (use `time.gmtime` (default) for UTC date/times, use `time.time` 

92 for local date/times), if you change the default, please change also `datefmt` keyword. 

93 kv_formatter: key values special formatter for formatting `{extras}` 

94 placeholder. 

95 include_reserved_attrs_in_extras: automatical include some reserved 

96 logrecord attributes in "extras" (example: `["process", "thread"]`). 

97 

98 """ 

99 

100 fmt: str | None = None 

101 datefmt: str | None = DEFAULT_STLOG_DATE_FORMAT 

102 style: str = "{" 

103 include_extras_keys_fnmatchs: Sequence[str] | None = None 

104 exclude_extras_keys_fnmatchs: Sequence[str] | None = None 

105 extra_key_rename_fn: Callable[[str], str | None] | None = None 

106 extra_key_max_length: int | None = None 

107 converter: Callable[[float | None], time.struct_time] = time.gmtime 

108 kv_formatter: KVFormatter | None = None 

109 include_reserved_attrs_in_extras: Sequence[str] = field(default_factory=list) 

110 _placeholders_in_fmt: list[str] | None = field( 

111 init=False, default=None, repr=False, compare=False 

112 ) 

113 

114 def __post_init__(self): 

115 super().__init__( # explicit call because logging.Formatter is not a dataclass 

116 fmt=self.fmt, datefmt=self.datefmt, style=self.style # type: ignore 

117 ) 

118 if self.extra_key_max_length is None: 

119 self.extra_key_max_length = 32 

120 self.include_extra_keys_patterns: list[re.Pattern] = [ 

121 re.compile(fnmatch.translate("*")) 

122 ] # all by default 

123 self.exclude_extra_keys_patterns: list[re.Pattern] = [] # empty by default 

124 if self.include_extras_keys_fnmatchs is not None: 

125 self.include_extra_keys_patterns = [ 

126 re.compile(fnmatch.translate(x)) 

127 for x in self.include_extras_keys_fnmatchs 

128 ] 

129 if self.exclude_extras_keys_fnmatchs is not None: 

130 self.exclude_extra_keys_patterns = [ 

131 re.compile(fnmatch.translate(x)) 

132 for x in self.exclude_extras_keys_fnmatchs 

133 ] 

134 if GLOBAL_LOGGING_CONFIG._unit_tests_mode: 

135 self.converter = _unit_tests_converter 

136 

137 @property 

138 def placeholders_in_fmt(self) -> list[str]: 

139 if self._placeholders_in_fmt is None: 

140 self._placeholders_in_fmt = parse_format(self.fmt, self.style) 

141 return self._placeholders_in_fmt 

142 

143 def _make_extras_string( 

144 self, record: logging.LogRecord, extra_kvs: dict[str, Any] | None = None 

145 ) -> str: 

146 if self.kv_formatter is None: 

147 return "" 

148 if not hasattr(record, STLOG_EXTRA_KEY): 

149 return "" 

150 kvs: dict[str, Any] = {} 

151 for k in list(getattr(record, STLOG_EXTRA_KEY)) + list( 

152 self.include_reserved_attrs_in_extras 

153 ): 

154 key = self._make_extra_key_name(k) 

155 if key: 

156 kvs[key] = getattr(record, k) 

157 if extra_kvs: 

158 kvs.update(extra_kvs) 

159 return self.kv_formatter.format(kvs) 

160 

161 def _make_extra_key_name(self, extra_key: str) -> str | None: 

162 new_extra_key: str | None = extra_key 

163 if self.extra_key_rename_fn is not None: 

164 new_extra_key = (self.extra_key_rename_fn)(extra_key) 

165 if new_extra_key is None: 

166 return None 

167 for pattern in self.include_extra_keys_patterns: 

168 if re.match(pattern, new_extra_key): 

169 break 

170 else: 

171 # not found 

172 return None 

173 for pattern in self.exclude_extra_keys_patterns: 

174 if re.match(pattern, new_extra_key): 

175 return None 

176 return _truncate_str( 

177 new_extra_key, 

178 self.extra_key_max_length if self.extra_key_max_length is not None else 0, 

179 ) 

180 

181 def format(self, record: logging.LogRecord) -> str: 

182 if GLOBAL_LOGGING_CONFIG._unit_tests_mode: 

183 # FIXME: it would be better as a Filter, wouldn't be? 

184 # fix some fields in record to get always the same values 

185 record.filename = "filename.py" 

186 record.pathname = "/path/filename.py" 

187 record.thread = 12345 

188 record.process = 6789 

189 record.processName = "MainProcess" 

190 record.threadName = "MainThread" 

191 return super().format(record) 

192 

193 

194# Adapted from https://github.com/Mergifyio/daiquiri/blob/main/daiquiri/formatter.py 

195@dataclass 

196class HumanFormatter(Formatter): 

197 """Formatter for a "human" output. 

198 

199 Extra keywords are merged into a `{extras}` placeholder by a `stlog.kvformatter.KVFormatter`. 

200 

201 If you use this placeholder on your `fmt`, any keywords 

202 passed to a logging call will be formatted into a "extras" string and 

203 included in a logging message. 

204 

205 Example:: 

206 

207 logger.info("my message", foo="bar", foo2=123) 

208 

209 will cause an "extras" string of:: 

210 

211 {foo=bar foo2=123} 

212 

213 You can change the way the `{extras}` placeholder is formatted 

214 by providing a KVFormatter object. 

215 

216 """ 

217 

218 def __post_init__(self): 

219 if self.fmt is None: 

220 self.fmt = DEFAULT_STLOG_HUMAN_FORMAT 

221 if self.kv_formatter is None: 

222 self.kv_formatter = LogFmtKVFormatter() 

223 if self.exclude_extras_keys_fnmatchs is None: 

224 self.exclude_extras_keys_fnmatchs = ("_*",) 

225 super().__post_init__() 

226 

227 def _add_extras(self, record: logging.LogRecord) -> None: 

228 record.extras = self._make_extras_string(record) 

229 

230 def _remove_extras(self, record: logging.LogRecord) -> None: 

231 delattr(record, "extras") 

232 

233 def format(self, record: logging.LogRecord) -> str: 

234 if "extras" in self.placeholders_in_fmt: 

235 self._add_extras(record) 

236 s = super().format(record) 

237 if "extras" in self.placeholders_in_fmt: 

238 self._remove_extras(record) 

239 return s 

240 

241 

242@dataclass 

243class RichHumanFormatter(HumanFormatter): 

244 def __post_init__(self): 

245 if self.kv_formatter is None: 

246 self.kv_formatter = LogFmtKVFormatter( 

247 prefix="\n :arrow_right_hook: ", 

248 suffix="", 

249 template="[repr.attrib_name]{key}[/repr.attrib_name][repr.attrib_equal]=[/repr.attrib_equal][repr.attrib_value]{value}[/repr.attrib_value]", 

250 ) 

251 if self.fmt is None: 

252 self.fmt = DEFAULT_STLOG_RICH_HUMAN_FORMAT 

253 super().__post_init__() 

254 

255 def _add_extras(self, record: logging.LogRecord) -> None: 

256 super()._add_extras(record) 

257 record.rich_escaped_message = rich_markup_escape(record.getMessage()) 

258 record.rich_escaped_extras = rich_markup_escape(record.extras) # type: ignore 

259 level = record.levelname.lower() 

260 if level in ["notset", "debug", "info", "critical"]: 

261 record.rich_level_style = "logging.level.%s" % level 

262 elif level == "warning": 

263 record.rich_level_style = "logging.level.error" 

264 elif level == "error": 

265 record.rich_level_style = "logging.level.critical" 

266 else: 

267 record.rich_level_style = "logging.level.none" 

268 

269 def _remove_extras(self, record: logging.LogRecord) -> None: 

270 delattr(record, "rich_escaped_message") 

271 delattr(record, "rich_level_style") 

272 delattr(record, "rich_escaped_extras") 

273 super()._remove_extras(record) 

274 

275 def formatException(self, ei): # noqa: N802 

276 return "" 

277 

278 def formatStack(self, stack_info): # noqa: N802 

279 return "" 

280 

281 

282def json_formatter_default_extra_key_rename_fn(key: str) -> str | None: 

283 """Simple "extra_key_rename" function to remove leading underscores.""" 

284 if key.startswith("_"): 

285 return key[1:] 

286 return key 

287 

288 

289@dataclass 

290class LogFmtFormatter(Formatter): 

291 exc_info_key: str | None = "exc_info" 

292 stack_info_key: str | None = "stack_info" 

293 

294 def __post_init__(self): 

295 if self.kv_formatter is None: 

296 self.kv_formatter = LogFmtKVFormatter(prefix=" ", suffix="") 

297 if self.fmt is None: 

298 self.fmt = DEFAULT_STLOG_LOGFMT_FORMAT 

299 if self.extra_key_max_length is None: 

300 self.extra_key_max_length = 0 

301 super().__post_init__() 

302 

303 def format(self, record: logging.LogRecord) -> str: 

304 record.message = record.getMessage() 

305 if self.usesTime(): 

306 record.asctime = self.formatTime(record, self.datefmt) 

307 record_dict: dict[str, Any] = { 

308 k: logfmt_format_value(getattr(record, k)) 

309 for k in self.placeholders_in_fmt 

310 if k != "extras" 

311 } 

312 extra_kvs: dict[str, Any] = {} 

313 if self.exc_info_key: 

314 if record.exc_info: 

315 extra_kvs[self.exc_info_key] = self.formatException(record.exc_info) 

316 elif record.exc_text: 

317 extra_kvs[self.exc_info_key] = record.exc_text 

318 if self.stack_info_key and record.stack_info: 

319 extra_kvs[self.stack_info_key] = self.formatStack(record.stack_info) 

320 if "extras" in self.placeholders_in_fmt: 

321 record_dict["extras"] = self._make_extras_string( 

322 record, extra_kvs=extra_kvs 

323 ) 

324 return format_string(self.fmt, self.style, record_dict) 

325 

326 

327@dataclass 

328class JsonFormatter(Formatter): 

329 """Formatter for a JSON / parsing friendly output.""" 

330 

331 indent: int | None = None 

332 sort_keys: bool = True 

333 include_extras_in_key: str | None = "" 

334 exc_info_key: str | None = "exc_info" 

335 stack_info_key: str | None = "stack_info" 

336 

337 def __post_init__(self): 

338 if self.extra_key_max_length is None: 

339 self.extra_key_max_length = 0 

340 if self.kv_formatter is None: 

341 # note: no need to sort/indent here as it will be sorted/indented at this level 

342 self.kv_formatter = JsonKVFormatter(sort_keys=False, indent=False) 

343 if self.fmt is None: 

344 self.fmt = DEFAULT_STLOG_JSON_FORMAT 

345 if self.extra_key_rename_fn is None: 

346 self.extra_key_rename_fn = json_formatter_default_extra_key_rename_fn 

347 super().__post_init__() 

348 

349 def json_serialize(self, message_dict: dict[str, Any]) -> str: 

350 return json.dumps( 

351 message_dict, 

352 indent=self.indent, 

353 sort_keys=self.sort_keys, 

354 default=_truncate_serialize, 

355 ) 

356 

357 def format(self, record: logging.LogRecord) -> str: 

358 record.message = record.getMessage() 

359 if self.usesTime(): 

360 record.asctime = self.formatTime(record, self.datefmt) 

361 record_dict: dict[str, Any] = { 

362 k: json.dumps(getattr(record, k)) 

363 for k in self.placeholders_in_fmt 

364 if k != "extras" 

365 } 

366 s = format_string(self.fmt, self.style, record_dict) 

367 obj = json.loads(s) 

368 if self.include_extras_in_key is not None: 

369 extras_str = self._make_extras_string(record) 

370 if extras_str: 

371 extras_obj = json.loads(extras_str) 

372 if self.include_extras_in_key == "": 

373 for key, value in extras_obj.items(): 

374 if key not in obj: 

375 obj[key] = value 

376 else: 

377 obj[self.include_extras_in_key] = extras_obj 

378 if self.exc_info_key: 

379 if record.exc_info: 

380 obj[self.exc_info_key] = self.formatException(record.exc_info) 

381 elif record.exc_text: 

382 obj[self.exc_info_key] = record.exc_text 

383 if self.stack_info_key and record.stack_info: 

384 obj[self.stack_info_key] = self.formatStack(record.stack_info) 

385 return self.json_serialize(obj)