schema.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. """schema is a library for validating Python data structures, such as those
  2. obtained from config-files, forms, external services or command-line
  3. parsing, converted from JSON/YAML (or something else) to Python data-types."""
  4. import re
  5. __version__ = '0.6.7'
  6. __all__ = ['Schema',
  7. 'And', 'Or', 'Regex', 'Optional', 'Use', 'Forbidden', 'Const',
  8. 'SchemaError',
  9. 'SchemaWrongKeyError',
  10. 'SchemaMissingKeyError',
  11. 'SchemaForbiddenKeyError',
  12. 'SchemaUnexpectedTypeError']
  13. class SchemaError(Exception):
  14. """Error during Schema validation."""
  15. def __init__(self, autos, errors=None):
  16. self.autos = autos if type(autos) is list else [autos]
  17. self.errors = errors if type(errors) is list else [errors]
  18. Exception.__init__(self, self.code)
  19. @property
  20. def code(self):
  21. """
  22. Removes duplicates values in auto and error list.
  23. parameters.
  24. """
  25. def uniq(seq):
  26. """
  27. Utility function that removes duplicate.
  28. """
  29. seen = set()
  30. seen_add = seen.add
  31. # This way removes duplicates while preserving the order.
  32. return [x for x in seq if x not in seen and not seen_add(x)]
  33. data_set = uniq(i for i in self.autos if i is not None)
  34. error_list = uniq(i for i in self.errors if i is not None)
  35. if error_list:
  36. return '\n'.join(error_list)
  37. return '\n'.join(data_set)
  38. class SchemaWrongKeyError(SchemaError):
  39. """Error Should be raised when an unexpected key is detected within the
  40. data set being."""
  41. pass
  42. class SchemaMissingKeyError(SchemaError):
  43. """Error should be raised when a mandatory key is not found within the
  44. data set being vaidated"""
  45. pass
  46. class SchemaForbiddenKeyError(SchemaError):
  47. """Error should be raised when a forbidden key is found within the
  48. data set being validated, and its value matches the value that was specified"""
  49. pass
  50. class SchemaUnexpectedTypeError(SchemaError):
  51. """Error should be raised when a type mismatch is detected within the
  52. data set being validated."""
  53. pass
  54. class And:
  55. """
  56. Utility function to combine validation directives in AND Boolean fashion.
  57. """
  58. def __init__(self, *args, **kw):
  59. self._args = args
  60. assert set(kw).issubset(['error', 'schema', 'ignore_extra_keys'])
  61. self._error = kw.get('error')
  62. self._ignore_extra_keys = kw.get('ignore_extra_keys', False)
  63. # You can pass your inherited Schema class.
  64. self._schema = kw.get('schema', Schema)
  65. def __repr__(self):
  66. return '%s(%s)' % (self.__class__.__name__,
  67. ', '.join(repr(a) for a in self._args))
  68. def validate(self, data):
  69. """
  70. Validate data using defined sub schema/expressions ensuring all
  71. values are valid.
  72. :param data: to be validated with sub defined schemas.
  73. :return: returns validated data
  74. """
  75. for s in [self._schema(s, error=self._error,
  76. ignore_extra_keys=self._ignore_extra_keys)
  77. for s in self._args]:
  78. data = s.validate(data)
  79. return data
  80. class Or(And):
  81. """Utility function to combine validation directives in a OR Boolean
  82. fashion."""
  83. def validate(self, data):
  84. """
  85. Validate data using sub defined schema/expressions ensuring at least
  86. one value is valid.
  87. :param data: data to be validated by provided schema.
  88. :return: return validated data if not validation
  89. """
  90. x = SchemaError([], [])
  91. for s in [self._schema(s, error=self._error,
  92. ignore_extra_keys=self._ignore_extra_keys)
  93. for s in self._args]:
  94. try:
  95. return s.validate(data)
  96. except SchemaError as _x:
  97. x = _x
  98. raise SchemaError(['%r did not validate %r' % (self, data)] + x.autos,
  99. [self._error.format(data) if self._error else None] +
  100. x.errors)
  101. class Regex:
  102. """
  103. Enables schema.py to validate string using regular expressions.
  104. """
  105. # Map all flags bits to a more readable description
  106. NAMES = ['re.ASCII', 're.DEBUG', 're.VERBOSE', 're.UNICODE', 're.DOTALL',
  107. 're.MULTILINE', 're.LOCALE', 're.IGNORECASE', 're.TEMPLATE']
  108. def __init__(self, pattern_str, flags=0, error=None):
  109. self._pattern_str = pattern_str
  110. flags_list = [Regex.NAMES[i] for i, f in # Name for each bit
  111. enumerate('{0:09b}'.format(flags)) if f != '0']
  112. if flags_list:
  113. self._flags_names = ', flags=' + '|'.join(flags_list)
  114. else:
  115. self._flags_names = ''
  116. self._pattern = re.compile(pattern_str, flags=flags)
  117. self._error = error
  118. def __repr__(self):
  119. return '%s(%r%s)' % (
  120. self.__class__.__name__, self._pattern_str, self._flags_names
  121. )
  122. def validate(self, data):
  123. """
  124. Validated data using defined regex.
  125. :param data: data to be validated
  126. :return: return validated data.
  127. """
  128. e = self._error
  129. try:
  130. if self._pattern.search(data):
  131. return data
  132. else:
  133. raise SchemaError('%r does not match %r' % (self, data), e)
  134. except TypeError:
  135. raise SchemaError('%r is not string nor buffer' % data, e)
  136. class Use:
  137. """
  138. For more general use cases, you can use the Use class to transform
  139. the data while it is being validate.
  140. """
  141. def __init__(self, callable_, error=None):
  142. assert callable(callable_)
  143. self._callable = callable_
  144. self._error = error
  145. def __repr__(self):
  146. return '%s(%r)' % (self.__class__.__name__, self._callable)
  147. def validate(self, data):
  148. try:
  149. return self._callable(data)
  150. except SchemaError as x:
  151. raise SchemaError([None] + x.autos,
  152. [self._error.format(data)
  153. if self._error else None] + x.errors)
  154. except BaseException as x:
  155. f = _callable_str(self._callable)
  156. raise SchemaError('%s(%r) raised %r' % (f, data, x),
  157. self._error.format(data)
  158. if self._error else None)
  159. COMPARABLE, CALLABLE, VALIDATOR, TYPE, DICT, ITERABLE = range(6)
  160. def _priority(s):
  161. """Return priority for a given object."""
  162. if type(s) in (list, tuple, set, frozenset):
  163. return ITERABLE
  164. if type(s) is dict:
  165. return DICT
  166. if issubclass(type(s), type):
  167. return TYPE
  168. if hasattr(s, 'validate'):
  169. return VALIDATOR
  170. if callable(s):
  171. return CALLABLE
  172. else:
  173. return COMPARABLE
  174. class Schema:
  175. """
  176. Entry point of the library, use this class to instantiate validation
  177. schema for the data that will be validated.
  178. """
  179. def __init__(self, schema, error=None, ignore_extra_keys=False):
  180. self._schema = schema
  181. self._error = error
  182. self._ignore_extra_keys = ignore_extra_keys
  183. def __repr__(self):
  184. return '%s(%r)' % (self.__class__.__name__, self._schema)
  185. @staticmethod
  186. def _dict_key_priority(s):
  187. """Return priority for a given key object."""
  188. if isinstance(s, Forbidden):
  189. return _priority(s._schema) - 0.5
  190. if isinstance(s, Optional):
  191. return _priority(s._schema) + 0.5
  192. return _priority(s)
  193. def validate(self, data):
  194. Schema = self.__class__
  195. s = self._schema
  196. e = self._error
  197. i = self._ignore_extra_keys
  198. flavor = _priority(s)
  199. if flavor == ITERABLE:
  200. data = Schema(type(s), error=e).validate(data)
  201. o = Or(*s, error=e, schema=Schema, ignore_extra_keys=i)
  202. return type(data)(o.validate(d) for d in data)
  203. if flavor == DICT:
  204. data = Schema(dict, error=e).validate(data)
  205. new = type(data)() # new - is a dict of the validated values
  206. coverage = set() # matched schema keys
  207. # for each key and value find a schema entry matching them, if any
  208. sorted_skeys = sorted(s, key=self._dict_key_priority)
  209. for key, value in data.items():
  210. for skey in sorted_skeys:
  211. svalue = s[skey]
  212. try:
  213. nkey = Schema(skey, error=e).validate(key)
  214. except SchemaError:
  215. pass
  216. else:
  217. if isinstance(skey, Forbidden):
  218. # As the content of the value makes little sense for
  219. # forbidden keys, we reverse its meaning:
  220. # we will only raise the SchemaErrorForbiddenKey
  221. # exception if the value does match, allowing for
  222. # excluding a key only if its value has a certain type,
  223. # and allowing Forbidden to work well in combination
  224. # with Optional.
  225. try:
  226. nvalue = Schema(svalue, error=e).validate(value)
  227. except SchemaError:
  228. continue
  229. raise SchemaForbiddenKeyError(
  230. 'Forbidden key encountered: %r in %r' %
  231. (nkey, data), e)
  232. try:
  233. nvalue = Schema(svalue, error=e,
  234. ignore_extra_keys=i).validate(value)
  235. except SchemaError as x:
  236. k = "Key '%s' error:" % nkey
  237. raise SchemaError([k] + x.autos, [e] + x.errors)
  238. else:
  239. new[nkey] = nvalue
  240. coverage.add(skey)
  241. break
  242. required = {k for k in s if type(k) not in [Optional, Forbidden]}
  243. if not required.issubset(coverage):
  244. missing_keys = required - coverage
  245. s_missing_keys = \
  246. ', '.join(repr(k) for k in sorted(missing_keys, key=repr))
  247. raise \
  248. SchemaMissingKeyError('Missing keys: ' + s_missing_keys, e)
  249. if not self._ignore_extra_keys and (len(new) != len(data)):
  250. wrong_keys = set(data.keys()) - set(new.keys())
  251. s_wrong_keys = \
  252. ', '.join(repr(k) for k in sorted(wrong_keys, key=repr))
  253. raise \
  254. SchemaWrongKeyError(
  255. 'Wrong keys %s in %r' % (s_wrong_keys, data),
  256. e.format(data) if e else None)
  257. # Apply default-having optionals that haven't been used:
  258. defaults = {k for k in s if type(k) is Optional and
  259. hasattr(k, 'default')} - coverage
  260. for default in defaults:
  261. new[default.key] = default.default
  262. return new
  263. if flavor == TYPE:
  264. if isinstance(data, s):
  265. return data
  266. else:
  267. raise SchemaUnexpectedTypeError(
  268. '%r should be instance of %r' % (data, s.__name__),
  269. e.format(data) if e else None)
  270. if flavor == VALIDATOR:
  271. try:
  272. return s.validate(data)
  273. except SchemaError as x:
  274. raise SchemaError([None] + x.autos, [e] + x.errors)
  275. except BaseException as x:
  276. raise SchemaError(
  277. '%r.validate(%r) raised %r' % (s, data, x),
  278. self._error.format(data) if self._error else None)
  279. if flavor == CALLABLE:
  280. f = _callable_str(s)
  281. try:
  282. if s(data):
  283. return data
  284. except SchemaError as x:
  285. raise SchemaError([None] + x.autos, [e] + x.errors)
  286. except BaseException as x:
  287. raise SchemaError(
  288. '%s(%r) raised %r' % (f, data, x),
  289. self._error.format(data) if self._error else None)
  290. raise SchemaError('%s(%r) should evaluate to True' % (f, data), e)
  291. if s == data:
  292. return data
  293. else:
  294. raise SchemaError('%r does not match %r' % (s, data),
  295. e.format(data) if e else None)
  296. class Optional(Schema):
  297. """Marker for an optional part of the validation Schema."""
  298. _MARKER = object()
  299. def __init__(self, *args, **kwargs):
  300. default = kwargs.pop('default', self._MARKER)
  301. super(Optional, self).__init__(*args, **kwargs)
  302. if default is not self._MARKER:
  303. # See if I can come up with a static key to use for myself:
  304. if _priority(self._schema) != COMPARABLE:
  305. raise TypeError(
  306. 'Optional keys with defaults must have simple, '
  307. 'predictable values, like literal strings or ints. '
  308. '"%r" is too complex.' % (self._schema,))
  309. self.default = default
  310. self.key = self._schema
  311. def __hash__(self):
  312. return hash(self._schema)
  313. def __eq__(self, other):
  314. return (self.__class__ is other.__class__ and
  315. getattr(self, 'default', self._MARKER) ==
  316. getattr(other, 'default', self._MARKER) and
  317. self._schema == other._schema)
  318. class Forbidden(Schema):
  319. def __init__(self, *args, **kwargs):
  320. super(Forbidden, self).__init__(*args, **kwargs)
  321. self.key = self._schema
  322. class Const(Schema):
  323. def validate(self, data):
  324. super(Const, self).validate(data)
  325. return data
  326. def _callable_str(callable_):
  327. if hasattr(callable_, '__name__'):
  328. return callable_.__name__
  329. return str(callable_)