Coverage for stlog/formatter.py: 91%

220 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-06-26 07:52 +0000

1from __future__ import annotations 

2 

3import fnmatch 

4import json 

5import logging 

6import re 

7import time 

8from dataclasses import dataclass, field 

9from typing import Any, Callable, Sequence 

10 

11from stlog.base import ( 

12 GLOBAL_LOGGING_CONFIG, 

13 STLOG_EXTRA_KEY, 

14 format_string, 

15 logfmt_format_value, 

16 parse_format, 

17 rich_markup_escape, 

18) 

19from stlog.kvformatter import ( 

20 JsonKVFormatter, 

21 KVFormatter, 

22 LogFmtKVFormatter, 

23 _truncate_serialize, 

24 _truncate_str, 

25) 

26 

27DEFAULT_STLOG_HUMAN_FORMAT = "{asctime} {name} [{levelname:^10s}] {message}{extras}" 

28DEFAULT_STLOG_RICH_HUMAN_FORMAT = ":arrow_forward: [log.time]{asctime}[/log.time] {name} [{rich_level_style}]{levelname:^8s}[/{rich_level_style}] [bold]{rich_escaped_message}[/bold]{extras}" 

29DEFAULT_STLOG_LOGFMT_FORMAT = ( 

30 "time={asctime} logger={name} level={levelname} message={message}{extras}" 

31) 

32DEFAULT_STLOG_JSON_FORMAT = """ 

33{{ 

34 "time": {asctime}, 

35 "logger": {name}, 

36 "level": {levelname}, 

37 "message": {message}, 

38 "source": {{ 

39 "path": {pathname}, 

40 "lineno": {lineno}, 

41 "module": {module}, 

42 "funcName": {funcName}, 

43 "process": {process}, 

44 "processName": {processName}, 

45 "thread": {thread}, 

46 "threadName": {threadName} 

47 }} 

48}} 

49""" 

50DEFAULT_STLOG_GCP_JSON_FORMAT = """ 

51{{ 

52 "timestamp": {asctime}, 

53 "logger": {name}, 

54 "severity": {levelname}, 

55 "message": {message}, 

56 "source": {{ 

57 "path": {pathname}, 

58 "lineno": {lineno}, 

59 "module": {module}, 

60 "funcName": {funcName}, 

61 "process": {process}, 

62 "processName": {processName}, 

63 "thread": {thread}, 

64 "threadName": {threadName} 

65 }} 

66}} 

67""" 

68DEFAULT_STLOG_DATE_FORMAT_HUMAN = "%Y-%m-%dT%H:%M:%SZ" 

69DEFAULT_STLOG_DATE_FORMAT_JSON = "%Y-%m-%dT%H:%M:%S.%fZ" 

70DEFAULT_STLOG_DATE_FORMAT = DEFAULT_STLOG_DATE_FORMAT_HUMAN # deprecated 

71 

72 

73def _unit_tests_converter(val: float | None) -> time.struct_time: 

74 # always the same value 

75 return time.gmtime(1680101317) 

76 

77 

78@dataclass 

79class Formatter(logging.Formatter): 

80 """Abstract base class for `stlog` formatters. 

81 

82 Attributes: 

83 fmt: the default format for the formatter. 

84 datefmt: the format to use for `{asctime}` placeholder. 

85 style: can be '%', '{' or '$' to select how the format string will be merged with its data 

86 (see https://docs.python.org/3/library/logging.html#logging.Formatter for details) 

87 include_extras_keys_fnmatchs: fnmatch patterns list for including keys in `{extras}` placeholder. 

88 exclude_extras_keys_fnmatchs: fnmatch patterns list for excluding keys in `{extras}` placeholder. 

89 extra_key_rename_fn: callable which takes a key name and return a renamed key to use 

90 (or None to ignore the key/value). 

91 extra_key_max_length: maximum size of extra keys to be included in `{extras}` placeholder 

92 (after this limit, the value will be truncated and ... will be added at the end, 0 means "no limit"). 

93 converter: time converter function (use `time.gmtime` (default) for UTC date/times, use `time.time` 

94 for local date/times), if you change the default, please change also `datefmt` keyword. 

95 kv_formatter: key values special formatter for formatting `{extras}` 

96 placeholder. 

97 include_reserved_attrs_in_extras: automatical include some reserved 

98 logrecord attributes in "extras" (example: `["process", "thread"]`). 

99 

100 """ 

101 

102 fmt: str | None = None 

103 datefmt: str | None = None 

104 style: str = "{" 

105 include_extras_keys_fnmatchs: Sequence[str] | None = None 

106 exclude_extras_keys_fnmatchs: Sequence[str] | None = None 

107 extra_key_rename_fn: Callable[[str], str | None] | None = None 

108 extra_key_max_length: int | None = None 

109 converter: Callable[[float | None], time.struct_time] = time.gmtime 

110 kv_formatter: KVFormatter | None = None 

111 include_reserved_attrs_in_extras: Sequence[str] = field(default_factory=list) 

112 _placeholders_in_fmt: list[str] | None = field( 

113 init=False, default=None, repr=False, compare=False 

114 ) 

115 

116 def __post_init__(self): 

117 if self.datefmt is None: 

118 self.datefmt = DEFAULT_STLOG_DATE_FORMAT_HUMAN 

119 super().__init__( # explicit call because logging.Formatter is not a dataclass 

120 fmt=self.fmt, 

121 datefmt=self.datefmt, 

122 style=self.style, # type: ignore 

123 ) 

124 if self.extra_key_max_length is None: 

125 self.extra_key_max_length = 32 

126 self.include_extra_keys_patterns: list[re.Pattern] = [ 

127 re.compile(fnmatch.translate("*")) 

128 ] # all by default 

129 self.exclude_extra_keys_patterns: list[re.Pattern] = [] # empty by default 

130 if self.include_extras_keys_fnmatchs is not None: 

131 self.include_extra_keys_patterns = [ 

132 re.compile(fnmatch.translate(x)) 

133 for x in self.include_extras_keys_fnmatchs 

134 ] 

135 if self.exclude_extras_keys_fnmatchs is not None: 

136 self.exclude_extra_keys_patterns = [ 

137 re.compile(fnmatch.translate(x)) 

138 for x in self.exclude_extras_keys_fnmatchs 

139 ] 

140 if GLOBAL_LOGGING_CONFIG._unit_tests_mode: 

141 self.converter = _unit_tests_converter 

142 

143 @property 

144 def placeholders_in_fmt(self) -> list[str]: 

145 if self._placeholders_in_fmt is None: 

146 self._placeholders_in_fmt = parse_format(self.fmt, self.style) 

147 return self._placeholders_in_fmt 

148 

149 def _make_extras_string( 

150 self, record: logging.LogRecord, extra_kvs: dict[str, Any] | None = None 

151 ) -> str: 

152 if self.kv_formatter is None: 

153 return "" 

154 if not hasattr(record, STLOG_EXTRA_KEY): 

155 return "" 

156 kvs: dict[str, Any] = {} 

157 for k in list(getattr(record, STLOG_EXTRA_KEY)) + list( 

158 self.include_reserved_attrs_in_extras 

159 ): 

160 key = self._make_extra_key_name(k) 

161 if key: 

162 kvs[key] = getattr(record, k) 

163 if extra_kvs: 

164 kvs.update(extra_kvs) 

165 return self.kv_formatter.format(kvs) 

166 

167 def _make_extra_key_name(self, extra_key: str) -> str | None: 

168 new_extra_key: str | None = extra_key 

169 if self.extra_key_rename_fn is not None: 

170 new_extra_key = (self.extra_key_rename_fn)(extra_key) 

171 if new_extra_key is None: 

172 return None 

173 for pattern in self.include_extra_keys_patterns: 

174 if re.match(pattern, new_extra_key): 

175 break 

176 else: 

177 # not found 

178 return None 

179 for pattern in self.exclude_extra_keys_patterns: 

180 if re.match(pattern, new_extra_key): 

181 return None 

182 return _truncate_str( 

183 new_extra_key, 

184 self.extra_key_max_length if self.extra_key_max_length is not None else 0, 

185 ) 

186 

187 def formatTime(self, record, datefmt=None): # noqa: N802 

188 # Override the standard formatTime to support %f in the datefmt 

189 # note: as strftime with %f returns nothing on alpine 

190 # we replace it first with @@@MSECS@@@ placeholder 

191 # then we replace @@@MSECS@@@ with the actual msecs value 

192 s = super().formatTime(record, datefmt=datefmt.replace("%f", "@@@MSECS@@@")) 

193 msecs = int(record.msecs) 

194 return s.replace("@@@MSECS@@@", f"{msecs:03d}") 

195 

196 def format(self, record: logging.LogRecord) -> str: 

197 if GLOBAL_LOGGING_CONFIG._unit_tests_mode: 

198 # FIXME: it would be better as a Filter, wouldn't be? 

199 # fix some fields in record to get always the same values 

200 record.filename = "filename.py" 

201 record.pathname = "/path/filename.py" 

202 record.thread = 12345 

203 record.process = 6789 

204 record.processName = "MainProcess" 

205 record.threadName = "MainThread" 

206 record.msecs = 0 

207 return super().format(record) 

208 

209 

210# Adapted from https://github.com/Mergifyio/daiquiri/blob/main/daiquiri/formatter.py 

211@dataclass 

212class HumanFormatter(Formatter): 

213 """Formatter for a "human" output. 

214 

215 Extra keywords are merged into a `{extras}` placeholder by a `stlog.kvformatter.KVFormatter`. 

216 

217 If you use this placeholder on your `fmt`, any keywords 

218 passed to a logging call will be formatted into a "extras" string and 

219 included in a logging message. 

220 

221 Example:: 

222 

223 logger.info("my message", foo="bar", foo2=123) 

224 

225 will cause an "extras" string of:: 

226 

227 {foo=bar foo2=123} 

228 

229 You can change the way the `{extras}` placeholder is formatted 

230 by providing a KVFormatter object. 

231 

232 """ 

233 

234 def __post_init__(self): 

235 if self.fmt is None: 

236 self.fmt = DEFAULT_STLOG_HUMAN_FORMAT 

237 if self.kv_formatter is None: 

238 self.kv_formatter = LogFmtKVFormatter() 

239 if self.exclude_extras_keys_fnmatchs is None: 

240 self.exclude_extras_keys_fnmatchs = ("_*",) 

241 super().__post_init__() 

242 

243 def _add_extras(self, record: logging.LogRecord) -> None: 

244 record.extras = self._make_extras_string(record) 

245 

246 def _remove_extras(self, record: logging.LogRecord) -> None: 

247 delattr(record, "extras") 

248 

249 def format(self, record: logging.LogRecord) -> str: 

250 if "extras" in self.placeholders_in_fmt: 

251 self._add_extras(record) 

252 s = super().format(record) 

253 if "extras" in self.placeholders_in_fmt: 

254 self._remove_extras(record) 

255 return s 

256 

257 

258@dataclass 

259class RichHumanFormatter(HumanFormatter): 

260 def __post_init__(self): 

261 if self.kv_formatter is None: 

262 self.kv_formatter = LogFmtKVFormatter( 

263 prefix="\n :arrow_right_hook: ", 

264 suffix="", 

265 template="[repr.attrib_name]{key}[/repr.attrib_name][repr.attrib_equal]=[/repr.attrib_equal][repr.attrib_value]{value}[/repr.attrib_value]", 

266 ) 

267 if self.fmt is None: 

268 self.fmt = DEFAULT_STLOG_RICH_HUMAN_FORMAT 

269 super().__post_init__() 

270 

271 def _add_extras(self, record: logging.LogRecord) -> None: 

272 super()._add_extras(record) 

273 record.rich_escaped_message = rich_markup_escape(record.getMessage()) 

274 record.rich_escaped_extras = rich_markup_escape(record.extras) # type: ignore 

275 level = record.levelname.lower() 

276 if level in ["notset", "debug", "info", "critical"]: 

277 record.rich_level_style = f"logging.level.{level}" 

278 elif level == "warning": 

279 record.rich_level_style = "logging.level.error" 

280 elif level == "error": 

281 record.rich_level_style = "logging.level.critical" 

282 else: 

283 record.rich_level_style = "logging.level.none" 

284 

285 def _remove_extras(self, record: logging.LogRecord) -> None: 

286 delattr(record, "rich_escaped_message") 

287 delattr(record, "rich_level_style") 

288 delattr(record, "rich_escaped_extras") 

289 super()._remove_extras(record) 

290 

291 def formatException(self, ei): # noqa: N802 

292 return "" 

293 

294 def formatStack(self, stack_info): # noqa: N802 

295 return "" 

296 

297 

298def json_formatter_default_extra_key_rename_fn(key: str) -> str | None: 

299 """Simple "extra_key_rename" function to remove leading underscores.""" 

300 if key.startswith("_"): 

301 return key[1:] 

302 return key 

303 

304 

305@dataclass 

306class LogFmtFormatter(Formatter): 

307 exc_info_key: str | None = "exc_info" 

308 stack_info_key: str | None = "stack_info" 

309 

310 def __post_init__(self): 

311 if self.kv_formatter is None: 

312 self.kv_formatter = LogFmtKVFormatter(prefix=" ", suffix="") 

313 if self.fmt is None: 

314 self.fmt = DEFAULT_STLOG_LOGFMT_FORMAT 

315 if self.extra_key_max_length is None: 

316 self.extra_key_max_length = 0 

317 super().__post_init__() 

318 

319 def format(self, record: logging.LogRecord) -> str: 

320 record.message = record.getMessage() 

321 if self.usesTime(): 

322 record.asctime = self.formatTime(record, self.datefmt) 

323 record_dict: dict[str, Any] = { 

324 k: logfmt_format_value(getattr(record, k)) 

325 for k in self.placeholders_in_fmt 

326 if k != "extras" 

327 } 

328 extra_kvs: dict[str, Any] = {} 

329 if self.exc_info_key: 

330 if record.exc_info: 

331 extra_kvs[self.exc_info_key] = self.formatException(record.exc_info) 

332 elif record.exc_text: 

333 extra_kvs[self.exc_info_key] = record.exc_text 

334 if self.stack_info_key and record.stack_info: 

335 extra_kvs[self.stack_info_key] = self.formatStack(record.stack_info) 

336 if "extras" in self.placeholders_in_fmt: 

337 record_dict["extras"] = self._make_extras_string( 

338 record, extra_kvs=extra_kvs 

339 ) 

340 return format_string(self.fmt, self.style, record_dict) 

341 

342 

343@dataclass 

344class JsonFormatter(Formatter): 

345 """Formatter for a JSON / parsing friendly output.""" 

346 

347 indent: int | None = None 

348 sort_keys: bool = True 

349 include_extras_in_key: str | None = "" 

350 exc_info_key: str | None = "exc_info" 

351 stack_info_key: str | None = "stack_info" 

352 

353 def __post_init__(self): 

354 if self.datefmt is None: 

355 self.datefmt = DEFAULT_STLOG_DATE_FORMAT_JSON 

356 if self.extra_key_max_length is None: 

357 self.extra_key_max_length = 0 

358 if self.kv_formatter is None: 

359 # note: no need to sort/indent here as it will be sorted/indented at this level 

360 self.kv_formatter = JsonKVFormatter(sort_keys=False, indent=False) 

361 if self.fmt is None: 

362 self.fmt = DEFAULT_STLOG_JSON_FORMAT 

363 if self.extra_key_rename_fn is None: 

364 self.extra_key_rename_fn = json_formatter_default_extra_key_rename_fn 

365 super().__post_init__() 

366 

367 def json_serialize(self, message_dict: dict[str, Any]) -> str: 

368 return json.dumps( 

369 message_dict, 

370 indent=self.indent, 

371 sort_keys=self.sort_keys, 

372 default=_truncate_serialize, 

373 ) 

374 

375 def format(self, record: logging.LogRecord) -> str: 

376 record.message = record.getMessage() 

377 if self.usesTime(): 

378 record.asctime = self.formatTime(record, self.datefmt) 

379 record_dict: dict[str, Any] = { 

380 k: json.dumps(getattr(record, k)) 

381 for k in self.placeholders_in_fmt 

382 if k != "extras" 

383 } 

384 s = format_string(self.fmt, self.style, record_dict) 

385 obj = json.loads(s) 

386 if self.include_extras_in_key is not None: 

387 extras_str = self._make_extras_string(record) 

388 if extras_str: 

389 extras_obj = json.loads(extras_str) 

390 if self.include_extras_in_key == "": 

391 for key, value in extras_obj.items(): 

392 if key not in obj: 

393 obj[key] = value 

394 else: 

395 obj[self.include_extras_in_key] = extras_obj 

396 if self.exc_info_key: 

397 if record.exc_info: 

398 obj[self.exc_info_key] = self.formatException(record.exc_info) 

399 elif record.exc_text: 

400 obj[self.exc_info_key] = record.exc_text 

401 if self.stack_info_key and record.stack_info: 

402 obj[self.stack_info_key] = self.formatStack(record.stack_info) 

403 return self.json_serialize(obj)