11from datetime import timedelta
22from enum import Enum
3+ from typing import Any , Dict , List , Optional , Union
34import os
45import re
5- from typing import Any , Dict , List , Optional , Union
66
77import yaml
88
99from .utils import resolve_type , substitute_env_vars
1010
11-
1211"""
1312This implements a simple semi-declarative configuration system based on type
1413annotations. Configuration objects derive from BaseConfig, and configuration fields
@@ -67,7 +66,7 @@ def __init__(self, attrs: Dict[str, Any], path: Optional[str] = None):
6766
6867 def _get_path (self , key : str ):
6968 if self .path is not None :
70- return self .path + '/' + key
69+ return self .path + "/" + key
7170 else :
7271 return key
7372
@@ -81,22 +80,24 @@ def iterate_objects(self, parent_key: str):
8180 return
8281
8382 for name , attrs in objects .items ():
84- yield name , Lookup (attrs , parent_key + '/' + name )
83+ yield name , Lookup (attrs , parent_key + "/" + name )
8584
8685 def _get (self , key : str , default : Any ):
8786 if default is Defaults .REQUIRED :
8887 try :
8988 return self .attrs [key ]
9089 except KeyError :
91- raise ConfigError ("A value is required for {}" .format (self ._get_path (key ))) \
92- from None
90+ raise ConfigError (
91+ "A value is required for {}" .format (self ._get_path (key ))
92+ ) from None
9393 else :
9494 return self .attrs .get (key , default )
9595
9696 def get_str (
97- self , key : str ,
97+ self ,
98+ key : str ,
9899 default : Union [str , None , Defaults ] = Defaults .REQUIRED ,
99- force_trailing_slash : bool = False
100+ force_trailing_slash : bool = False ,
100101 ) -> Optional [str ]:
101102 val = self ._get (key , default )
102103 if val is None :
@@ -107,8 +108,8 @@ def get_str(
107108
108109 val = substitute_env_vars (val )
109110
110- if force_trailing_slash and not val .endswith ('/' ):
111- val += '/'
111+ if force_trailing_slash and not val .endswith ("/" ):
112+ val += "/"
112113
113114 return val
114115
@@ -140,18 +141,20 @@ def get_regex_list(
140141 ) -> List [re .Pattern ]:
141142 val = self ._get (key , default )
142143 if not isinstance (val , list ) or not all (isinstance (v , str ) for v in val ):
143- raise ConfigError ("{} must be a list of regular expressions" . format (
144- self ._get_path (key ))
144+ raise ConfigError (
145+ "{} must be a list of regular expressions" . format ( self ._get_path (key ))
145146 )
146147
147148 result = []
148149 for v in val :
149150 try :
150151 result .append (re .compile (v ))
151152 except re .error as e :
152- raise ConfigError ("{}: '{}' is not a valid regular expression: {}" .format (
153- self ._get_path (key ), v , e .msg
154- ))
153+ raise ConfigError (
154+ "{}: '{}' is not a valid regular expression: {}" .format (
155+ self ._get_path (key ), v , e .msg
156+ )
157+ )
155158
156159 return result
157160
@@ -165,9 +168,10 @@ def get_str_dict(
165168 return {substitute_env_vars (k ): substitute_env_vars (v ) for k , v in val .items ()}
166169
167170 def get_timedelta (
168- self , key : str ,
171+ self ,
172+ key : str ,
169173 default : Union [timedelta , None , Defaults ] = Defaults .REQUIRED ,
170- force_suffix : bool = True
174+ force_suffix : bool = True ,
171175 ) -> Optional [timedelta ]:
172176 val = self ._get (key , default = default )
173177 if val is None :
@@ -180,7 +184,7 @@ def get_timedelta(
180184 return timedelta (seconds = val )
181185
182186 if isinstance (val , str ):
183- m = re .match (r' ^(\d+)([dhms])$' , val )
187+ m = re .match (r" ^(\d+)([dhms])$" , val )
184188 if m :
185189 if m .group (2 ) == "d" :
186190 return timedelta (days = int (m .group (1 )))
@@ -191,16 +195,17 @@ def get_timedelta(
191195 else :
192196 return timedelta (seconds = int (m .group (1 )))
193197
194- raise ConfigError ("{} should be a time interval of the form <digits>[dhms]"
195- .format (self ._get_path (key )))
198+ raise ConfigError (
199+ "{} should be a time interval of the form <digits>[dhms]" .format (self ._get_path (key ))
200+ )
196201
197202
198203class BaseConfig :
199204 def _init_from_class (self , cls , lookup : Lookup ):
200205 if not issubclass (cls , BaseConfig ):
201206 return
202207
203- annotations = getattr (cls , ' __annotations__' , None )
208+ annotations = getattr (cls , " __annotations__" , None )
204209 if annotations :
205210 for name , v in annotations .items ():
206211 resolved , args , optional = resolve_type (v )
@@ -213,19 +218,19 @@ def _init_from_class(self, cls, lookup: Lookup):
213218 else :
214219 kwargs = {"default" : classval }
215220
216- if resolved == str :
221+ if resolved is str :
217222 val : Any = lookup .get_str (name , ** kwargs )
218- elif resolved == bool :
223+ elif resolved is bool :
219224 val = lookup .get_bool (name , ** kwargs )
220- elif resolved == int :
225+ elif resolved is int :
221226 val = lookup .get_int (name , ** kwargs )
222- elif resolved == list and args [0 ] == str :
227+ elif resolved is list and args [0 ] is str :
223228 val = lookup .get_str_list (name , ** kwargs )
224- elif resolved == list and args [0 ] == re .Pattern :
229+ elif resolved is list and args [0 ] is re .Pattern :
225230 val = lookup .get_regex_list (name , ** kwargs )
226- elif resolved == dict and args [0 ] == str and args [1 ] == str :
231+ elif resolved is dict and args [0 ] is str and args [1 ] is str :
227232 val = lookup .get_str_dict (name , ** kwargs )
228- elif resolved == timedelta :
233+ elif resolved is timedelta :
229234 val = lookup .get_timedelta (name , ** kwargs )
230235 elif issubclass (resolved , BaseConfig ):
231236 val = resolved (lookup .sublookup (name ))
@@ -240,7 +245,7 @@ def __init__(self, lookup: Lookup):
240245
241246 @classmethod
242247 def from_path (cls , path : str ):
243- with open (path , 'r' ) as f :
248+ with open (path , "r" ) as f :
244249 yml = yaml .safe_load (f )
245250
246251 if yml is None :
0 commit comments