Coverage for stlog/base.py: 87%

139 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-08-21 07:31 +0000

1from __future__ import annotations 

2 

3import inspect 

4import json 

5import numbers 

6import os 

7import re 

8import string 

9import types 

10from dataclasses import dataclass, field 

11from string import Template 

12from typing import Any, Callable, Match 

13 

14STLOG_EXTRA_KEY = "_stlog_extra" 

15RICH_AVAILABLE = False 

16try: 

17 from rich.traceback import Traceback 

18 

19 RICH_AVAILABLE = True 

20except ImportError: 

21 pass 

22 

23 

24TRUE_VALUES = ("1", "true", "yes") 

25FALSE_VALUES = ("0", "false", "no") 

26ALLOWED_CHARS_WITHOUT_LOGFMT_QUOTING: set = set( 

27 string.ascii_letters + string.digits + ",-.@_~:" 

28) 

29 

30 

31class StlogError(Exception): 

32 pass 

33 

34 

35@dataclass 

36class GlobalLoggingConfig: 

37 setup: bool = False 

38 program_name: str = field( 

39 default_factory=lambda: os.path.basename(inspect.stack()[-1][1]) 

40 ) 

41 reinject_context_in_standard_logging: bool | None = None 

42 read_extra_kwargs_from_standard_logging: bool | None = None 

43 _unit_tests_mode: bool = ( 

44 os.environ.get("STLOG_UNIT_TESTS_MODE", "0").lower() in TRUE_VALUES 

45 ) 

46 

47 

48GLOBAL_LOGGING_CONFIG = GlobalLoggingConfig() 

49 

50# skip natural LogRecord attributes 

51# http://docs.python.org/library/logging.html#logrecord-attributes 

52# Stolen from https://github.com/madzak/python-json-logger/blob/master/src/pythonjsonlogger/jsonlogger.py 

53RESERVED_ATTRS: tuple[str, ...] = ( 

54 "args", 

55 "asctime", 

56 "created", 

57 "exc_info", 

58 "exc_text", 

59 "filename", 

60 "funcName", 

61 "levelname", 

62 "levelno", 

63 "lineno", 

64 "module", 

65 "msecs", 

66 "message", 

67 "msg", 

68 "name", 

69 "pathname", 

70 "process", 

71 "processName", 

72 "relativeCreated", 

73 "stack_info", 

74 "thread", 

75 "threadName", 

76 "extra", # specific to stlog 

77 "extras", # specific to stlog 

78 STLOG_EXTRA_KEY, # specific to stlog 

79 "rich_escaped_message", # specific to stlog 

80 "rich_escaped_extras", # specific to stlog 

81 "rich_level_style", # specific to stlog 

82) 

83 

84 

85def check_json_types_or_raise(to_check: Any) -> None: 

86 if to_check is None: 

87 return 

88 if not isinstance(to_check, (dict, tuple, list, bool, str, int, float, bool)): 

89 raise StlogError( 

90 "to_check should be a dict/tuple/list/bool/str/int/float/bool/None, found %s" 

91 % type(to_check) 

92 ) 

93 if isinstance(to_check, (list, tuple)): 

94 for item in to_check: 

95 check_json_types_or_raise(item) 

96 elif isinstance(to_check, dict): 

97 for key, value in to_check.items(): 

98 if not isinstance(key, str): 

99 raise StlogError("dict keys should be str, found %s" % type(key)) 

100 check_json_types_or_raise(value) 

101 

102 

103# Adapted from https://github.com/jteppinette/python-logfmter/blob/main/logfmter/formatter.py 

104def logfmt_format_string(value: str) -> str: 

105 needs_dquote_escaping = '"' in value 

106 needs_newline_escaping = "\n" in value 

107 needs_quoting = not set(value).issubset(ALLOWED_CHARS_WITHOUT_LOGFMT_QUOTING) 

108 if needs_dquote_escaping: 

109 value = value.replace('"', '\\"') 

110 if needs_newline_escaping: 

111 value = value.replace("\n", "\\n") 

112 if needs_quoting: 

113 value = f'"{value}"' 

114 return value if value else '""' 

115 

116 

117# Adapted from https://github.com/jteppinette/python-logfmter/blob/main/logfmter/formatter.py 

118def logfmt_format_value(value: Any) -> str: 

119 if value is None: 

120 return "" 

121 elif isinstance(value, bool): 

122 return "true" if value else "false" 

123 elif isinstance(value, numbers.Number): 

124 return str(value) 

125 return logfmt_format_string(str(value)) 

126 

127 

128def _get_env_json_context() -> dict[str, Any]: 

129 env_key = "STLOG_ENV_JSON_CONTEXT" 

130 env_context = os.environ.get(env_key, None) 

131 if env_context is not None: 

132 try: 

133 return json.loads(env_context) 

134 except Exception: 

135 print( 

136 f"WARNING: can't load {env_key} env var value as valid JSON => ignoring" 

137 ) 

138 return {} 

139 

140 

141def _get_env_context() -> dict[str, Any]: 

142 prefix = "STLOG_ENV_CONTEXT_" 

143 res: dict[str, Any] = {} 

144 for env_key in os.environ.keys(): 

145 if not env_key.startswith(prefix): 

146 continue 

147 key = env_key[len(prefix) :].lower() 

148 if not key: 

149 continue 

150 res[key] = os.environ[env_key] 

151 return res 

152 

153 

154def get_env_context() -> dict[str, Any]: 

155 if check_env_true("STLOG_IGNORE_ENV_CONTEXT", False): 

156 return {} 

157 env_context = {**_get_env_context(), **_get_env_json_context()} 

158 for key in env_context.keys(): 

159 if key in RESERVED_ATTRS: 

160 raise StlogError("key: %s is not allowed (reserved key)", key) 

161 if not isinstance(key, str): 

162 raise StlogError("key: %s must be str", key) 

163 if not key.isidentifier(): 

164 raise StlogError( 

165 "key: %s not allowed (must be a valid python identifier)", key 

166 ) 

167 return env_context 

168 

169 

170def check_true(value: str | None, default: bool = False) -> bool: 

171 if value is None: 

172 return default 

173 return value.lower() in TRUE_VALUES 

174 

175 

176def check_false(value: str | None, default: bool = False) -> bool: 

177 if value is None: 

178 return default 

179 return value.lower() in FALSE_VALUES 

180 

181 

182def check_env_true(env_var: str, default: bool = False) -> bool: 

183 return check_true(os.environ.get(env_var, None), default) 

184 

185 

186def check_env_false(env_var: str, default: bool = False) -> bool: 

187 return check_false(os.environ.get(env_var, None), default) 

188 

189 

190def rich_dump_exception_on_console( 

191 console: Any, 

192 exc_type: type[BaseException], 

193 value: BaseException, 

194 tb: types.TracebackType | None, 

195) -> None: 

196 console.print( 

197 Traceback.from_exception( 

198 exc_type, 

199 value, 

200 tb, 

201 width=100, 

202 extra_lines=3, 

203 theme=None, 

204 word_wrap=False, 

205 show_locals=True, 

206 locals_max_length=10, 

207 locals_max_string=80, 

208 locals_hide_dunder=True, 

209 locals_hide_sunder=False, 

210 indent_guides=True, 

211 suppress=(), 

212 max_frames=100, 

213 ) 

214 ) 

215 

216 

217_ReStringMatch = Match[str] # regex match object 

218_ReSubCallable = Callable[[_ReStringMatch], str] # Callable invoked by re.sub 

219_EscapeSubMethod = Callable[[_ReSubCallable, str], str] # Sub method of a compiled re 

220 

221 

222# Stolen from https://github.com/Textualize/rich/blob/master/rich/markup.py 

223def rich_markup_escape( 

224 markup: str, 

225 _escape: _EscapeSubMethod = re.compile(r"(\\*)(\[[a-z#/@][^[]*?])").sub, 

226) -> str: 

227 def escape_backslashes(match: Match[str]) -> str: 

228 """Called by re.sub replace matches.""" 

229 backslashes, text = match.groups() 

230 return f"{backslashes}{backslashes}\\{text}" 

231 

232 markup = _escape(escape_backslashes, markup) 

233 return markup 

234 

235 

236# Adapted from https://github.com/madzak/python-json-logger/blob/master/src/pythonjsonlogger/jsonlogger.py 

237def parse_format(fmt: str | None, style: str) -> list[str]: 

238 """ 

239 Parses format string looking for substitutions 

240 

241 This method is responsible for returning a list of fields (as strings) 

242 to include in all log messages. 

243 """ 

244 if not fmt: 

245 return [] 

246 if style == "$": 

247 formatter_style_pattern = re.compile(r"\$\{(.+?)\}", re.IGNORECASE) 

248 elif style == "{": 

249 formatter_style_pattern = re.compile(r"\{(.+?)\}", re.IGNORECASE) 

250 elif style == "%": 

251 formatter_style_pattern = re.compile(r"%\((.+?)\)", re.IGNORECASE) 

252 else: 

253 raise ValueError("Unsupported style: %s" % style) 

254 return formatter_style_pattern.findall(fmt) 

255 

256 

257def format_string(fmt: str | None, style: str, record_dict: dict[str, Any]) -> str: 

258 if not fmt: 

259 return "" 

260 if style == "$": 

261 return Template(fmt).substitute(**record_dict) 

262 elif style == "{": 

263 return fmt.format(**record_dict) 

264 elif style == "%": 

265 return fmt % record_dict 

266 raise StlogError("Invalid style: %s" % style)