Source code for skinfer.json_schema_merger

# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, unicode_literals
import itertools


[docs]def merge_property_list(first_properties, second_properties): result = {} for key, value in first_properties.items(): if key in second_properties: result[key] = _merge_schema(value, second_properties[key]) else: result[key] = value for key, value in second_properties.items(): if key not in result: result[key] = value return result
[docs]def get_reserved_keys(schema_type): if schema_type == 'object': return set(['type', 'properties', 'required']) if schema_type == 'array': return set(['type', 'items']) else: raise NotImplementedError( "Missing implementation for schema type: %s" % schema_type)
[docs]def copy_nonreserved_keys(first, second): reserved_keys = get_reserved_keys(first.get('type')) return ((key, value) for key, value in itertools.chain(first.items(), second.items()) if key not in reserved_keys)
[docs]def merge_objects(first, second): required = list(set(first.get('required', [])) & set(second.get('required', []))) result = { 'type': 'object', 'properties': merge_property_list(first.get('properties', {}), second.get('properties', {})), } if required: result['required'] = required result.update(copy_nonreserved_keys(first, second)) return result
[docs]def merge_strings(first, second): # TODO: add support for schemas more complex than {"type": "string"} return second
[docs]def merge_numbers(first, second): return {"type": "number"}
[docs]def merge_booleans(first, second): return {"type": "boolean"}
[docs]def merge_nulls(first, second): return {"type": "null"}
[docs]def merge_arrays(first, second): def are_json_schema_tuples(first_items, second_items): return all([ isinstance(first_items, list), isinstance(second_items, list), len(first_items) == len(second_items) ]) def merge_tuples(first_items, second_items): return [_merge_schema(e1, e2) for e1, e2 in zip(first_items, second_items)] def merge_items(first_items, second_items): if not (first_items and second_items): return None if are_json_schema_tuples(first_items, second_items): return merge_tuples(first_items, second_items) return _merge_schema(first_items, second_items) items = merge_items(first.get('items'), second.get('items')) result = { 'type': 'array', } if items: result['items'] = items result.update(copy_nonreserved_keys(first, second)) return result
[docs]def merge_with_any_of(first, second): first_any_of = first['anyOf'] if 'anyOf' in first else [first] second_any_of = second['anyOf'] if 'anyOf' in second else [second] any_of = [] for schema in first_any_of + second_any_of: if schema not in any_of: any_of.append(schema) return {"anyOf": any_of}
def _merge_schema(first, second): if first.get('type') != second.get('type'): return merge_with_any_of(first, second) if 'anyOf' in first or 'anyOf' in second: return merge_with_any_of(first, second) schema_type = first.get('type') if schema_type == 'object': return merge_objects(first, second) elif schema_type == 'string': return merge_strings(first, second) elif schema_type == 'array': return merge_arrays(first, second) elif schema_type == 'number': return merge_numbers(first, second) elif schema_type == 'boolean': return merge_booleans(first, second) elif schema_type == 'null': return merge_nulls(first, second) else: raise NotImplementedError("Type %s is not yet supported" % schema_type)
[docs]def merge_schema(first, second): if not (type(first) == type(second) == dict): raise ValueError("Argument is not a schema") if not (first.get('type') == second.get('type') == 'object'): raise NotImplementedError("Unsupported root type") return merge_objects(first, second)