# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
import itertools
import sys
[docs]def merge_property_list(first_properties, second_properties):
result = {}
for key, value in first_properties.items():
if key in second_properties:
result[key] = _merge_schema(value, second_properties[key])
else:
result[key] = value
for key, value in second_properties.items():
if key not in result:
result[key] = value
return result
[docs]def get_reserved_keys(schema_type):
if (schema_type not in SCHEMA_TYPES or
'reserved_keys' not in SCHEMA_TYPES[schema_type]):
mesg = "Missing implementation for schema type: %s" % schema_type
raise NotImplementedError(mesg)
return SCHEMA_TYPES[schema_type]['reserved_keys']
[docs]def copy_nonreserved_keys(first, second):
reserved_keys = get_reserved_keys(first.get('type'))
return ((key, value)
for key, value in itertools.chain(first.items(), second.items())
if key not in reserved_keys)
[docs]def merge_objects(first, second):
required = list(set(first.get('required', [])) &
set(second.get('required', [])))
result = {
'type': 'object',
'properties': merge_property_list(first.get('properties', {}),
second.get('properties', {})),
}
if required:
result['required'] = required
result.update(copy_nonreserved_keys(first, second))
return result
[docs]def min_or_none(val1, val2):
"""Returns min(val1, val2) returning None only if both values are None"""
return min(val1, val2, key=lambda x: sys.maxint if x is None else x)
[docs]def max_or_none(val1, val2):
"""Returns max(val1, val2) returning None only if both values are None"""
return max(val1, val2, key=lambda x: -sys.maxint if x is None else x)
[docs]def merge_strings(first, second):
result = {'type': 'string'}
result.update(copy_nonreserved_keys(first, second))
minLength = min_or_none(first.get('minLength'), second.get('minLength'))
if minLength:
result['minLength'] = minLength
maxLength = max_or_none(first.get('maxLength'), second.get('maxLength'))
if maxLength:
result['maxLength'] = maxLength
return result
[docs]def merge_numbers(first, second):
return {"type": "number"}
[docs]def merge_booleans(first, second):
return {"type": "boolean"}
[docs]def merge_nulls(first, second):
return {"type": "null"}
[docs]def merge_arrays(first, second):
def is_schema_tuple(item):
return isinstance(item, list)
def are_json_schema_tuples(first_items, second_items):
return all([
is_schema_tuple(first_items),
is_schema_tuple(second_items),
len(first_items) == len(second_items)
])
def merge_tuples(first_items, second_items):
return [_merge_schema(e1, e2) for e1, e2 in zip(first_items, second_items)]
def merge_array_list_with_array_tuple(first_items, second_items):
tuple_items, merged = first_items, second_items
if is_schema_tuple(merged):
tuple_items, merged = merged, tuple_items
for schema in tuple_items:
merged = _merge_schema(merged, schema)
return merged
def merge_items(first_items, second_items):
if not (first_items and second_items):
return None
if are_json_schema_tuples(first_items, second_items):
return merge_tuples(first_items, second_items)
if is_schema_tuple(first_items) or is_schema_tuple(second_items):
return merge_array_list_with_array_tuple(first_items, second_items)
return _merge_schema(first_items, second_items)
items = merge_items(first.get('items'), second.get('items'))
result = {
'type': 'array',
}
if items:
result['items'] = items
result.update(copy_nonreserved_keys(first, second))
return result
[docs]def merge_with_any_of(first, second):
first_any_of = first['anyOf'] if 'anyOf' in first else [first]
second_any_of = second['anyOf'] if 'anyOf' in second else [second]
any_of = []
for schema in first_any_of + second_any_of:
if schema not in any_of:
any_of.append(schema)
return {"anyOf": any_of}
def _merge_schema(first, second):
if first.get('type') != second.get('type'):
return merge_with_any_of(first, second)
if 'anyOf' in first or 'anyOf' in second:
return merge_with_any_of(first, second)
schema_type = first.get('type')
if schema_type not in SCHEMA_TYPES:
raise NotImplementedError("Type %s is not yet supported" % schema_type)
merge_function = SCHEMA_TYPES[schema_type]['merge_function']
return merge_function(first, second)
[docs]def merge_schema(first, second):
"""Returns the result of merging the two given schemas.
"""
if not (type(first) == type(second) == dict):
raise ValueError("Argument is not a schema")
if not (first.get('type') == second.get('type') == 'object'):
raise NotImplementedError("Unsupported root type")
return merge_objects(first, second)
SCHEMA_TYPES = {
'object': {
'merge_function': merge_objects,
'reserved_keys': set(['type', 'properties', 'required']),
},
'string': {
'merge_function': merge_strings,
'reserved_keys': set(['type', 'minLength', 'maxLength']),
},
'array': {
'merge_function': merge_arrays,
'reserved_keys': set(['type', 'items']),
},
'number': {
'merge_function': merge_numbers,
},
'boolean': {
'merge_function': merge_booleans,
},
'null': {
'merge_function': merge_nulls,
},
}