Coverage for stlog/formatter.py: 90%
208 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-08-21 07:31 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-08-21 07:31 +0000
1from __future__ import annotations
3import fnmatch
4import json
5import logging
6import re
7import time
8from dataclasses import dataclass, field
9from typing import Any, Callable, Sequence
11from stlog.base import (
12 GLOBAL_LOGGING_CONFIG,
13 STLOG_EXTRA_KEY,
14 format_string,
15 logfmt_format_value,
16 parse_format,
17 rich_markup_escape,
18)
19from stlog.kvformatter import (
20 JsonKVFormatter,
21 KVFormatter,
22 LogFmtKVFormatter,
23 _truncate_serialize,
24 _truncate_str,
25)
27DEFAULT_STLOG_HUMAN_FORMAT = "{asctime} {name} [{levelname:^10s}] {message}{extras}"
28DEFAULT_STLOG_RICH_HUMAN_FORMAT = ":arrow_forward: [log.time]{asctime}[/log.time] {name} [{rich_level_style}]{levelname:^8s}[/{rich_level_style}] [bold]{rich_escaped_message}[/bold]{extras}"
29DEFAULT_STLOG_LOGFMT_FORMAT = (
30 "time={asctime} logger={name} level={levelname} message={message}{extras}"
31)
32DEFAULT_STLOG_JSON_FORMAT = """
33{{
34 "time": {asctime},
35 "logger": {name},
36 "level": {levelname},
37 "message": {message},
38 "source": {{
39 "path": {pathname},
40 "lineno": {lineno},
41 "module": {module},
42 "funcName": {funcName},
43 "process": {process},
44 "processName": {processName},
45 "thread": {thread},
46 "threadName": {threadName}
47 }}
48}}
49"""
50DEFAULT_STLOG_GCP_JSON_FORMAT = """
51{{
52 "timestamp": {asctime},
53 "logger": {name},
54 "severity": {levelname},
55 "message": {message},
56 "source": {{
57 "path": {pathname},
58 "lineno": {lineno},
59 "module": {module},
60 "funcName": {funcName},
61 "process": {process},
62 "processName": {processName},
63 "thread": {thread},
64 "threadName": {threadName}
65 }}
66}}
67"""
68DEFAULT_STLOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
71def _unit_tests_converter(val: float | None) -> time.struct_time:
72 # always the same value
73 return time.gmtime(1680101317)
76@dataclass
77class Formatter(logging.Formatter):
78 """Abstract base class for `stlog` formatters.
80 Attributes:
81 fmt: the default format for the formatter.
82 datefmt: the format to use for `{asctime}` placeholder.
83 style: can be '%', '{' or '$' to select how the format string will be merged with its data
84 (see https://docs.python.org/3/library/logging.html#logging.Formatter for details)
85 include_extras_keys_fnmatchs: fnmatch patterns list for including keys in `{extras}` placeholder.
86 exclude_extras_keys_fnmatchs: fnmatch patterns list for excluding keys in `{extras}` placeholder.
87 extra_key_rename_fn: callable which takes a key name and return a renamed key to use
88 (or None to ignore the key/value).
89 extra_key_max_length: maximum size of extra keys to be included in `{extras}` placeholder
90 (after this limit, the value will be truncated and ... will be added at the end, 0 means "no limit").
91 converter: time converter function (use `time.gmtime` (default) for UTC date/times, use `time.time`
92 for local date/times), if you change the default, please change also `datefmt` keyword.
93 kv_formatter: key values special formatter for formatting `{extras}`
94 placeholder.
95 include_reserved_attrs_in_extras: automatical include some reserved
96 logrecord attributes in "extras" (example: `["process", "thread"]`).
98 """
100 fmt: str | None = None
101 datefmt: str | None = DEFAULT_STLOG_DATE_FORMAT
102 style: str = "{"
103 include_extras_keys_fnmatchs: Sequence[str] | None = None
104 exclude_extras_keys_fnmatchs: Sequence[str] | None = None
105 extra_key_rename_fn: Callable[[str], str | None] | None = None
106 extra_key_max_length: int | None = None
107 converter: Callable[[float | None], time.struct_time] = time.gmtime
108 kv_formatter: KVFormatter | None = None
109 include_reserved_attrs_in_extras: Sequence[str] = field(default_factory=list)
110 _placeholders_in_fmt: list[str] | None = field(
111 init=False, default=None, repr=False, compare=False
112 )
114 def __post_init__(self):
115 super().__init__( # explicit call because logging.Formatter is not a dataclass
116 fmt=self.fmt, datefmt=self.datefmt, style=self.style # type: ignore
117 )
118 if self.extra_key_max_length is None:
119 self.extra_key_max_length = 32
120 self.include_extra_keys_patterns: list[re.Pattern] = [
121 re.compile(fnmatch.translate("*"))
122 ] # all by default
123 self.exclude_extra_keys_patterns: list[re.Pattern] = [] # empty by default
124 if self.include_extras_keys_fnmatchs is not None:
125 self.include_extra_keys_patterns = [
126 re.compile(fnmatch.translate(x))
127 for x in self.include_extras_keys_fnmatchs
128 ]
129 if self.exclude_extras_keys_fnmatchs is not None:
130 self.exclude_extra_keys_patterns = [
131 re.compile(fnmatch.translate(x))
132 for x in self.exclude_extras_keys_fnmatchs
133 ]
134 if GLOBAL_LOGGING_CONFIG._unit_tests_mode:
135 self.converter = _unit_tests_converter
137 @property
138 def placeholders_in_fmt(self) -> list[str]:
139 if self._placeholders_in_fmt is None:
140 self._placeholders_in_fmt = parse_format(self.fmt, self.style)
141 return self._placeholders_in_fmt
143 def _make_extras_string(
144 self, record: logging.LogRecord, extra_kvs: dict[str, Any] | None = None
145 ) -> str:
146 if self.kv_formatter is None:
147 return ""
148 if not hasattr(record, STLOG_EXTRA_KEY):
149 return ""
150 kvs: dict[str, Any] = {}
151 for k in list(getattr(record, STLOG_EXTRA_KEY)) + list(
152 self.include_reserved_attrs_in_extras
153 ):
154 key = self._make_extra_key_name(k)
155 if key:
156 kvs[key] = getattr(record, k)
157 if extra_kvs:
158 kvs.update(extra_kvs)
159 return self.kv_formatter.format(kvs)
161 def _make_extra_key_name(self, extra_key: str) -> str | None:
162 new_extra_key: str | None = extra_key
163 if self.extra_key_rename_fn is not None:
164 new_extra_key = (self.extra_key_rename_fn)(extra_key)
165 if new_extra_key is None:
166 return None
167 for pattern in self.include_extra_keys_patterns:
168 if re.match(pattern, new_extra_key):
169 break
170 else:
171 # not found
172 return None
173 for pattern in self.exclude_extra_keys_patterns:
174 if re.match(pattern, new_extra_key):
175 return None
176 return _truncate_str(
177 new_extra_key,
178 self.extra_key_max_length if self.extra_key_max_length is not None else 0,
179 )
181 def format(self, record: logging.LogRecord) -> str:
182 if GLOBAL_LOGGING_CONFIG._unit_tests_mode:
183 # FIXME: it would be better as a Filter, wouldn't be?
184 # fix some fields in record to get always the same values
185 record.filename = "filename.py"
186 record.pathname = "/path/filename.py"
187 record.thread = 12345
188 record.process = 6789
189 record.processName = "MainProcess"
190 record.threadName = "MainThread"
191 return super().format(record)
194# Adapted from https://github.com/Mergifyio/daiquiri/blob/main/daiquiri/formatter.py
195@dataclass
196class HumanFormatter(Formatter):
197 """Formatter for a "human" output.
199 Extra keywords are merged into a `{extras}` placeholder by a `stlog.kvformatter.KVFormatter`.
201 If you use this placeholder on your `fmt`, any keywords
202 passed to a logging call will be formatted into a "extras" string and
203 included in a logging message.
205 Example::
207 logger.info("my message", foo="bar", foo2=123)
209 will cause an "extras" string of::
211 {foo=bar foo2=123}
213 You can change the way the `{extras}` placeholder is formatted
214 by providing a KVFormatter object.
216 """
218 def __post_init__(self):
219 if self.fmt is None:
220 self.fmt = DEFAULT_STLOG_HUMAN_FORMAT
221 if self.kv_formatter is None:
222 self.kv_formatter = LogFmtKVFormatter()
223 if self.exclude_extras_keys_fnmatchs is None:
224 self.exclude_extras_keys_fnmatchs = ("_*",)
225 super().__post_init__()
227 def _add_extras(self, record: logging.LogRecord) -> None:
228 record.extras = self._make_extras_string(record)
230 def _remove_extras(self, record: logging.LogRecord) -> None:
231 delattr(record, "extras")
233 def format(self, record: logging.LogRecord) -> str:
234 if "extras" in self.placeholders_in_fmt:
235 self._add_extras(record)
236 s = super().format(record)
237 if "extras" in self.placeholders_in_fmt:
238 self._remove_extras(record)
239 return s
242@dataclass
243class RichHumanFormatter(HumanFormatter):
244 def __post_init__(self):
245 if self.kv_formatter is None:
246 self.kv_formatter = LogFmtKVFormatter(
247 prefix="\n :arrow_right_hook: ",
248 suffix="",
249 template="[repr.attrib_name]{key}[/repr.attrib_name][repr.attrib_equal]=[/repr.attrib_equal][repr.attrib_value]{value}[/repr.attrib_value]",
250 )
251 if self.fmt is None:
252 self.fmt = DEFAULT_STLOG_RICH_HUMAN_FORMAT
253 super().__post_init__()
255 def _add_extras(self, record: logging.LogRecord) -> None:
256 super()._add_extras(record)
257 record.rich_escaped_message = rich_markup_escape(record.getMessage())
258 record.rich_escaped_extras = rich_markup_escape(record.extras) # type: ignore
259 level = record.levelname.lower()
260 if level in ["notset", "debug", "info", "critical"]:
261 record.rich_level_style = "logging.level.%s" % level
262 elif level == "warning":
263 record.rich_level_style = "logging.level.error"
264 elif level == "error":
265 record.rich_level_style = "logging.level.critical"
266 else:
267 record.rich_level_style = "logging.level.none"
269 def _remove_extras(self, record: logging.LogRecord) -> None:
270 delattr(record, "rich_escaped_message")
271 delattr(record, "rich_level_style")
272 delattr(record, "rich_escaped_extras")
273 super()._remove_extras(record)
275 def formatException(self, ei): # noqa: N802
276 return ""
278 def formatStack(self, stack_info): # noqa: N802
279 return ""
282def json_formatter_default_extra_key_rename_fn(key: str) -> str | None:
283 """Simple "extra_key_rename" function to remove leading underscores."""
284 if key.startswith("_"):
285 return key[1:]
286 return key
289@dataclass
290class LogFmtFormatter(Formatter):
291 exc_info_key: str | None = "exc_info"
292 stack_info_key: str | None = "stack_info"
294 def __post_init__(self):
295 if self.kv_formatter is None:
296 self.kv_formatter = LogFmtKVFormatter(prefix=" ", suffix="")
297 if self.fmt is None:
298 self.fmt = DEFAULT_STLOG_LOGFMT_FORMAT
299 if self.extra_key_max_length is None:
300 self.extra_key_max_length = 0
301 super().__post_init__()
303 def format(self, record: logging.LogRecord) -> str:
304 record.message = record.getMessage()
305 if self.usesTime():
306 record.asctime = self.formatTime(record, self.datefmt)
307 record_dict: dict[str, Any] = {
308 k: logfmt_format_value(getattr(record, k))
309 for k in self.placeholders_in_fmt
310 if k != "extras"
311 }
312 extra_kvs: dict[str, Any] = {}
313 if self.exc_info_key:
314 if record.exc_info:
315 extra_kvs[self.exc_info_key] = self.formatException(record.exc_info)
316 elif record.exc_text:
317 extra_kvs[self.exc_info_key] = record.exc_text
318 if self.stack_info_key and record.stack_info:
319 extra_kvs[self.stack_info_key] = self.formatStack(record.stack_info)
320 if "extras" in self.placeholders_in_fmt:
321 record_dict["extras"] = self._make_extras_string(
322 record, extra_kvs=extra_kvs
323 )
324 return format_string(self.fmt, self.style, record_dict)
327@dataclass
328class JsonFormatter(Formatter):
329 """Formatter for a JSON / parsing friendly output."""
331 indent: int | None = None
332 sort_keys: bool = True
333 include_extras_in_key: str | None = ""
334 exc_info_key: str | None = "exc_info"
335 stack_info_key: str | None = "stack_info"
337 def __post_init__(self):
338 if self.extra_key_max_length is None:
339 self.extra_key_max_length = 0
340 if self.kv_formatter is None:
341 # note: no need to sort/indent here as it will be sorted/indented at this level
342 self.kv_formatter = JsonKVFormatter(sort_keys=False, indent=False)
343 if self.fmt is None:
344 self.fmt = DEFAULT_STLOG_JSON_FORMAT
345 if self.extra_key_rename_fn is None:
346 self.extra_key_rename_fn = json_formatter_default_extra_key_rename_fn
347 super().__post_init__()
349 def json_serialize(self, message_dict: dict[str, Any]) -> str:
350 return json.dumps(
351 message_dict,
352 indent=self.indent,
353 sort_keys=self.sort_keys,
354 default=_truncate_serialize,
355 )
357 def format(self, record: logging.LogRecord) -> str:
358 record.message = record.getMessage()
359 if self.usesTime():
360 record.asctime = self.formatTime(record, self.datefmt)
361 record_dict: dict[str, Any] = {
362 k: json.dumps(getattr(record, k))
363 for k in self.placeholders_in_fmt
364 if k != "extras"
365 }
366 s = format_string(self.fmt, self.style, record_dict)
367 obj = json.loads(s)
368 if self.include_extras_in_key is not None:
369 extras_str = self._make_extras_string(record)
370 if extras_str:
371 extras_obj = json.loads(extras_str)
372 if self.include_extras_in_key == "":
373 for key, value in extras_obj.items():
374 if key not in obj:
375 obj[key] = value
376 else:
377 obj[self.include_extras_in_key] = extras_obj
378 if self.exc_info_key:
379 if record.exc_info:
380 obj[self.exc_info_key] = self.formatException(record.exc_info)
381 elif record.exc_text:
382 obj[self.exc_info_key] = record.exc_text
383 if self.stack_info_key and record.stack_info:
384 obj[self.stack_info_key] = self.formatStack(record.stack_info)
385 return self.json_serialize(obj)