diff options
Diffstat (limited to 'tests/model_fields/test_jsonfield.py')
| -rw-r--r-- | tests/model_fields/test_jsonfield.py | 667 |
1 files changed, 667 insertions, 0 deletions
diff --git a/tests/model_fields/test_jsonfield.py b/tests/model_fields/test_jsonfield.py new file mode 100644 index 0000000000..464cf163d4 --- /dev/null +++ b/tests/model_fields/test_jsonfield.py @@ -0,0 +1,667 @@ +import operator +import uuid +from unittest import mock, skipIf, skipUnless + +from django import forms +from django.core import serializers +from django.core.exceptions import ValidationError +from django.core.serializers.json import DjangoJSONEncoder +from django.db import ( + DataError, IntegrityError, NotSupportedError, OperationalError, connection, + models, +) +from django.db.models import Count, F, OuterRef, Q, Subquery, Transform, Value +from django.db.models.expressions import RawSQL +from django.db.models.fields.json import ( + KeyTextTransform, KeyTransform, KeyTransformFactory, + KeyTransformTextLookupMixin, +) +from django.db.models.functions import Cast +from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature +from django.test.utils import CaptureQueriesContext + +from .models import CustomJSONDecoder, JSONModel, NullableJSONModel + + +@skipUnlessDBFeature('supports_json_field') +class JSONFieldTests(TestCase): + def test_invalid_value(self): + msg = 'is not JSON serializable' + with self.assertRaisesMessage(TypeError, msg): + NullableJSONModel.objects.create(value={ + 'uuid': uuid.UUID('d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475'), + }) + + def test_custom_encoder_decoder(self): + value = {'uuid': uuid.UUID('{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}')} + obj = NullableJSONModel(value_custom=value) + obj.clean_fields() + obj.save() + obj.refresh_from_db() + self.assertEqual(obj.value_custom, value) + + def test_db_check_constraints(self): + value = '{@!invalid json value 123 $!@#' + with mock.patch.object(DjangoJSONEncoder, 'encode', return_value=value): + with self.assertRaises((IntegrityError, DataError, OperationalError)): + NullableJSONModel.objects.create(value_custom=value) + + +class TestMethods(SimpleTestCase): + def test_deconstruct(self): + field = models.JSONField() + name, path, args, kwargs = field.deconstruct() + self.assertEqual(path, 'django.db.models.JSONField') + self.assertEqual(args, []) + self.assertEqual(kwargs, {}) + + def test_deconstruct_custom_encoder_decoder(self): + field = models.JSONField(encoder=DjangoJSONEncoder, decoder=CustomJSONDecoder) + name, path, args, kwargs = field.deconstruct() + self.assertEqual(kwargs['encoder'], DjangoJSONEncoder) + self.assertEqual(kwargs['decoder'], CustomJSONDecoder) + + def test_get_transforms(self): + @models.JSONField.register_lookup + class MyTransform(Transform): + lookup_name = 'my_transform' + field = models.JSONField() + transform = field.get_transform('my_transform') + self.assertIs(transform, MyTransform) + models.JSONField._unregister_lookup(MyTransform) + models.JSONField._clear_cached_lookups() + transform = field.get_transform('my_transform') + self.assertIsInstance(transform, KeyTransformFactory) + + def test_key_transform_text_lookup_mixin_non_key_transform(self): + transform = Transform('test') + msg = ( + 'Transform should be an instance of KeyTransform in order to use ' + 'this lookup.' + ) + with self.assertRaisesMessage(TypeError, msg): + KeyTransformTextLookupMixin(transform) + + +class TestValidation(SimpleTestCase): + def test_invalid_encoder(self): + msg = 'The encoder parameter must be a callable object.' + with self.assertRaisesMessage(ValueError, msg): + models.JSONField(encoder=DjangoJSONEncoder()) + + def test_invalid_decoder(self): + msg = 'The decoder parameter must be a callable object.' + with self.assertRaisesMessage(ValueError, msg): + models.JSONField(decoder=CustomJSONDecoder()) + + def test_validation_error(self): + field = models.JSONField() + msg = 'Value must be valid JSON.' + value = uuid.UUID('{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}') + with self.assertRaisesMessage(ValidationError, msg): + field.clean({'uuid': value}, None) + + def test_custom_encoder(self): + field = models.JSONField(encoder=DjangoJSONEncoder) + value = uuid.UUID('{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}') + field.clean({'uuid': value}, None) + + +class TestFormField(SimpleTestCase): + def test_formfield(self): + model_field = models.JSONField() + form_field = model_field.formfield() + self.assertIsInstance(form_field, forms.JSONField) + + def test_formfield_custom_encoder_decoder(self): + model_field = models.JSONField(encoder=DjangoJSONEncoder, decoder=CustomJSONDecoder) + form_field = model_field.formfield() + self.assertIs(form_field.encoder, DjangoJSONEncoder) + self.assertIs(form_field.decoder, CustomJSONDecoder) + + +class TestSerialization(SimpleTestCase): + test_data = ( + '[{"fields": {"value": %s}, ' + '"model": "model_fields.jsonmodel", "pk": null}]' + ) + test_values = ( + # (Python value, serialized value), + ({'a': 'b', 'c': None}, '{"a": "b", "c": null}'), + ('abc', '"abc"'), + ('{"a": "a"}', '"{\\"a\\": \\"a\\"}"'), + ) + + def test_dumping(self): + for value, serialized in self.test_values: + with self.subTest(value=value): + instance = JSONModel(value=value) + data = serializers.serialize('json', [instance]) + self.assertJSONEqual(data, self.test_data % serialized) + + def test_loading(self): + for value, serialized in self.test_values: + with self.subTest(value=value): + instance = list( + serializers.deserialize('json', self.test_data % serialized) + )[0].object + self.assertEqual(instance.value, value) + + +@skipUnlessDBFeature('supports_json_field') +class TestSaveLoad(TestCase): + def test_null(self): + obj = NullableJSONModel(value=None) + obj.save() + obj.refresh_from_db() + self.assertIsNone(obj.value) + + @skipUnlessDBFeature('supports_primitives_in_json_field') + def test_json_null_different_from_sql_null(self): + json_null = NullableJSONModel.objects.create(value=Value('null')) + json_null.refresh_from_db() + sql_null = NullableJSONModel.objects.create(value=None) + sql_null.refresh_from_db() + # 'null' is not equal to NULL in the database. + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value=Value('null')), + [json_null], + ) + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value=None), + [json_null], + ) + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__isnull=True), + [sql_null], + ) + # 'null' is equal to NULL in Python (None). + self.assertEqual(json_null.value, sql_null.value) + + @skipUnlessDBFeature('supports_primitives_in_json_field') + def test_primitives(self): + values = [ + True, + 1, + 1.45, + 'String', + '', + ] + for value in values: + with self.subTest(value=value): + obj = JSONModel(value=value) + obj.save() + obj.refresh_from_db() + self.assertEqual(obj.value, value) + + def test_dict(self): + values = [ + {}, + {'name': 'John', 'age': 20, 'height': 180.3}, + {'a': True, 'b': {'b1': False, 'b2': None}}, + ] + for value in values: + with self.subTest(value=value): + obj = JSONModel.objects.create(value=value) + obj.refresh_from_db() + self.assertEqual(obj.value, value) + + def test_list(self): + values = [ + [], + ['John', 20, 180.3], + [True, [False, None]], + ] + for value in values: + with self.subTest(value=value): + obj = JSONModel.objects.create(value=value) + obj.refresh_from_db() + self.assertEqual(obj.value, value) + + def test_realistic_object(self): + value = { + 'name': 'John', + 'age': 20, + 'pets': [ + {'name': 'Kit', 'type': 'cat', 'age': 2}, + {'name': 'Max', 'type': 'dog', 'age': 1}, + ], + 'courses': [ + ['A1', 'A2', 'A3'], + ['B1', 'B2'], + ['C1'], + ], + } + obj = JSONModel.objects.create(value=value) + obj.refresh_from_db() + self.assertEqual(obj.value, value) + + +@skipUnlessDBFeature('supports_json_field') +class TestQuerying(TestCase): + @classmethod + def setUpTestData(cls): + cls.primitives = [True, False, 'yes', 7, 9.6] + values = [ + None, + [], + {}, + {'a': 'b', 'c': 14}, + { + 'a': 'b', + 'c': 14, + 'd': ['e', {'f': 'g'}], + 'h': True, + 'i': False, + 'j': None, + 'k': {'l': 'm'}, + 'n': [None], + }, + [1, [2]], + {'k': True, 'l': False}, + { + 'foo': 'bar', + 'baz': {'a': 'b', 'c': 'd'}, + 'bar': ['foo', 'bar'], + 'bax': {'foo': 'bar'}, + }, + ] + cls.objs = [ + NullableJSONModel.objects.create(value=value) + for value in values + ] + if connection.features.supports_primitives_in_json_field: + cls.objs.extend([ + NullableJSONModel.objects.create(value=value) + for value in cls.primitives + ]) + cls.raw_sql = '%s::jsonb' if connection.vendor == 'postgresql' else '%s' + + def test_exact(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__exact={}), + [self.objs[2]], + ) + + def test_exact_complex(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__exact={'a': 'b', 'c': 14}), + [self.objs[3]], + ) + + def test_isnull(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__isnull=True), + [self.objs[0]], + ) + + def test_ordering_by_transform(self): + objs = [ + NullableJSONModel.objects.create(value={'ord': 93, 'name': 'bar'}), + NullableJSONModel.objects.create(value={'ord': 22.1, 'name': 'foo'}), + NullableJSONModel.objects.create(value={'ord': -1, 'name': 'baz'}), + NullableJSONModel.objects.create(value={'ord': 21.931902, 'name': 'spam'}), + NullableJSONModel.objects.create(value={'ord': -100291029, 'name': 'eggs'}), + ] + query = NullableJSONModel.objects.filter(value__name__isnull=False).order_by('value__ord') + expected = [objs[4], objs[2], objs[3], objs[1], objs[0]] + mariadb = connection.vendor == 'mysql' and connection.mysql_is_mariadb + if mariadb or connection.vendor == 'oracle': + # MariaDB and Oracle return JSON values as strings. + expected = [objs[2], objs[4], objs[3], objs[1], objs[0]] + self.assertSequenceEqual(query, expected) + + def test_ordering_grouping_by_key_transform(self): + base_qs = NullableJSONModel.objects.filter(value__d__0__isnull=False) + for qs in ( + base_qs.order_by('value__d__0'), + base_qs.annotate(key=KeyTransform('0', KeyTransform('d', 'value'))).order_by('key'), + ): + self.assertSequenceEqual(qs, [self.objs[4]]) + qs = NullableJSONModel.objects.filter(value__isnull=False) + self.assertQuerysetEqual( + qs.filter(value__isnull=False).annotate( + key=KeyTextTransform('f', KeyTransform('1', KeyTransform('d', 'value'))), + ).values('key').annotate(count=Count('key')).order_by('count'), + [(None, 0), ('g', 1)], + operator.itemgetter('key', 'count'), + ) + + @skipIf(connection.vendor == 'oracle', "Oracle doesn't support grouping by LOBs, see #24096.") + def test_ordering_grouping_by_count(self): + qs = NullableJSONModel.objects.filter( + value__isnull=False, + ).values('value__d__0').annotate(count=Count('value__d__0')).order_by('count') + self.assertQuerysetEqual(qs, [1, 11], operator.itemgetter('count')) + + def test_key_transform_raw_expression(self): + expr = RawSQL(self.raw_sql, ['{"x": "bar"}']) + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__foo=KeyTransform('x', expr)), + [self.objs[7]], + ) + + def test_nested_key_transform_raw_expression(self): + expr = RawSQL(self.raw_sql, ['{"x": {"y": "bar"}}']) + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__foo=KeyTransform('y', KeyTransform('x', expr))), + [self.objs[7]], + ) + + def test_key_transform_expression(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__d__0__isnull=False).annotate( + key=KeyTransform('d', 'value'), + chain=KeyTransform('0', 'key'), + expr=KeyTransform('0', Cast('key', models.JSONField())), + ).filter(chain=F('expr')), + [self.objs[4]], + ) + + def test_nested_key_transform_expression(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__d__0__isnull=False).annotate( + key=KeyTransform('d', 'value'), + chain=KeyTransform('f', KeyTransform('1', 'key')), + expr=KeyTransform('f', KeyTransform('1', Cast('key', models.JSONField()))), + ).filter(chain=F('expr')), + [self.objs[4]], + ) + + def test_has_key(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__has_key='a'), + [self.objs[3], self.objs[4]], + ) + + def test_has_key_null_value(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__has_key='j'), + [self.objs[4]], + ) + + def test_has_key_deep(self): + tests = [ + (Q(value__baz__has_key='a'), self.objs[7]), + (Q(value__has_key=KeyTransform('a', KeyTransform('baz', 'value'))), self.objs[7]), + (Q(value__has_key=KeyTransform('c', KeyTransform('baz', 'value'))), self.objs[7]), + (Q(value__d__1__has_key='f'), self.objs[4]), + ( + Q(value__has_key=KeyTransform('f', KeyTransform('1', KeyTransform('d', 'value')))), + self.objs[4], + ) + ] + for condition, expected in tests: + with self.subTest(condition=condition): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(condition), + [expected], + ) + + def test_has_key_list(self): + obj = NullableJSONModel.objects.create(value=[{'a': 1}, {'b': 'x'}]) + tests = [ + Q(value__1__has_key='b'), + Q(value__has_key=KeyTransform('b', KeyTransform(1, 'value'))), + Q(value__has_key=KeyTransform('b', KeyTransform('1', 'value'))), + ] + for condition in tests: + with self.subTest(condition=condition): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(condition), + [obj], + ) + + def test_has_keys(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__has_keys=['a', 'c', 'h']), + [self.objs[4]], + ) + + def test_has_any_keys(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__has_any_keys=['c', 'l']), + [self.objs[3], self.objs[4], self.objs[6]], + ) + + def test_contains(self): + tests = [ + ({}, self.objs[2:5] + self.objs[6:8]), + ({'baz': {'a': 'b', 'c': 'd'}}, [self.objs[7]]), + ({'k': True, 'l': False}, [self.objs[6]]), + ({'d': ['e', {'f': 'g'}]}, [self.objs[4]]), + ([1, [2]], [self.objs[5]]), + ({'n': [None]}, [self.objs[4]]), + ({'j': None}, [self.objs[4]]), + ] + for value, expected in tests: + with self.subTest(value=value): + qs = NullableJSONModel.objects.filter(value__contains=value) + self.assertSequenceEqual(qs, expected) + + @skipUnlessDBFeature('supports_primitives_in_json_field') + def test_contains_primitives(self): + for value in self.primitives: + with self.subTest(value=value): + qs = NullableJSONModel.objects.filter(value__contains=value) + self.assertIs(qs.exists(), True) + + @skipIf( + connection.vendor == 'oracle', + "Oracle doesn't support contained_by lookup.", + ) + def test_contained_by(self): + qs = NullableJSONModel.objects.filter(value__contained_by={'a': 'b', 'c': 14, 'h': True}) + self.assertSequenceEqual(qs, self.objs[2:4]) + + @skipUnless( + connection.vendor == 'oracle', + "Oracle doesn't support contained_by lookup.", + ) + def test_contained_by_unsupported(self): + msg = 'contained_by lookup is not supported on Oracle.' + with self.assertRaisesMessage(NotSupportedError, msg): + NullableJSONModel.objects.filter(value__contained_by={'a': 'b'}).get() + + def test_deep_values(self): + qs = NullableJSONModel.objects.values_list('value__k__l') + expected_objs = [(None,)] * len(self.objs) + expected_objs[4] = ('m',) + self.assertSequenceEqual(qs, expected_objs) + + @skipUnlessDBFeature('can_distinct_on_fields') + def test_deep_distinct(self): + query = NullableJSONModel.objects.distinct('value__k__l').values_list('value__k__l') + self.assertSequenceEqual(query, [('m',), (None,)]) + + def test_isnull_key(self): + # key__isnull=False works the same as has_key='key'. + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__a__isnull=True), + self.objs[:3] + self.objs[5:], + ) + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__a__isnull=False), + [self.objs[3], self.objs[4]], + ) + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__j__isnull=False), + [self.objs[4]], + ) + + def test_isnull_key_or_none(self): + obj = NullableJSONModel.objects.create(value={'a': None}) + self.assertSequenceEqual( + NullableJSONModel.objects.filter(Q(value__a__isnull=True) | Q(value__a=None)), + self.objs[:3] + self.objs[5:] + [obj], + ) + + def test_none_key(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__j=None), + [self.objs[4]], + ) + + def test_none_key_exclude(self): + obj = NullableJSONModel.objects.create(value={'j': 1}) + if connection.vendor == 'oracle': + # Oracle supports filtering JSON objects with NULL keys, but the + # current implementation doesn't support it. + self.assertSequenceEqual( + NullableJSONModel.objects.exclude(value__j=None), + self.objs[1:4] + self.objs[5:] + [obj], + ) + else: + self.assertSequenceEqual(NullableJSONModel.objects.exclude(value__j=None), [obj]) + + def test_shallow_list_lookup(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__0=1), + [self.objs[5]], + ) + + def test_shallow_obj_lookup(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__a='b'), + [self.objs[3], self.objs[4]], + ) + + def test_obj_subquery_lookup(self): + qs = NullableJSONModel.objects.annotate( + field=Subquery(NullableJSONModel.objects.filter(pk=OuterRef('pk')).values('value')), + ).filter(field__a='b') + self.assertSequenceEqual(qs, [self.objs[3], self.objs[4]]) + + def test_deep_lookup_objs(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__k__l='m'), + [self.objs[4]], + ) + + def test_shallow_lookup_obj_target(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__k={'l': 'm'}), + [self.objs[4]], + ) + + def test_deep_lookup_array(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__1__0=2), + [self.objs[5]], + ) + + def test_deep_lookup_mixed(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__d__1__f='g'), + [self.objs[4]], + ) + + def test_deep_lookup_transform(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__c__gt=2), + [self.objs[3], self.objs[4]], + ) + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__c__gt=2.33), + [self.objs[3], self.objs[4]], + ) + self.assertIs(NullableJSONModel.objects.filter(value__c__lt=5).exists(), False) + + @skipIf( + connection.vendor == 'oracle', + 'Raises ORA-00600: internal error code on Oracle 18.', + ) + def test_usage_in_subquery(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter( + id__in=NullableJSONModel.objects.filter(value__c=14), + ), + self.objs[3:5], + ) + + def test_key_iexact(self): + self.assertIs(NullableJSONModel.objects.filter(value__foo__iexact='BaR').exists(), True) + self.assertIs(NullableJSONModel.objects.filter(value__foo__iexact='"BaR"').exists(), False) + + def test_key_contains(self): + self.assertIs(NullableJSONModel.objects.filter(value__foo__contains='ar').exists(), True) + + def test_key_icontains(self): + self.assertIs(NullableJSONModel.objects.filter(value__foo__icontains='Ar').exists(), True) + + def test_key_startswith(self): + self.assertIs(NullableJSONModel.objects.filter(value__foo__startswith='b').exists(), True) + + def test_key_istartswith(self): + self.assertIs(NullableJSONModel.objects.filter(value__foo__istartswith='B').exists(), True) + + def test_key_endswith(self): + self.assertIs(NullableJSONModel.objects.filter(value__foo__endswith='r').exists(), True) + + def test_key_iendswith(self): + self.assertIs(NullableJSONModel.objects.filter(value__foo__iendswith='R').exists(), True) + + def test_key_regex(self): + self.assertIs(NullableJSONModel.objects.filter(value__foo__regex=r'^bar$').exists(), True) + + def test_key_iregex(self): + self.assertIs(NullableJSONModel.objects.filter(value__foo__iregex=r'^bAr$').exists(), True) + + @skipUnless(connection.vendor == 'postgresql', 'kwargs are crafted for PostgreSQL.') + def test_key_sql_injection(self): + with CaptureQueriesContext(connection) as queries: + self.assertIs( + NullableJSONModel.objects.filter(**{ + """value__test' = '"a"') OR 1 = 1 OR ('d""": 'x', + }).exists(), + False, + ) + self.assertIn( + """."value" -> 'test'' = ''"a"'') OR 1 = 1 OR (''d') = '"x"' """, + queries[0]['sql'], + ) + + @skipIf(connection.vendor == 'postgresql', 'PostgreSQL uses operators not functions.') + def test_key_sql_injection_escape(self): + query = str(JSONModel.objects.filter(**{ + """value__test") = '"a"' OR 1 = 1 OR ("d""": 'x', + }).query) + self.assertIn('"test\\"', query) + self.assertIn('\\"d', query) + + def test_key_escape(self): + obj = NullableJSONModel.objects.create(value={'%total': 10}) + self.assertEqual(NullableJSONModel.objects.filter(**{'value__%total': 10}).get(), obj) + + def test_none_key_and_exact_lookup(self): + self.assertSequenceEqual( + NullableJSONModel.objects.filter(value__a='b', value__j=None), + [self.objs[4]], + ) + + def test_lookups_with_key_transform(self): + tests = ( + ('value__d__contains', 'e'), + ('value__baz__has_key', 'c'), + ('value__baz__has_keys', ['a', 'c']), + ('value__baz__has_any_keys', ['a', 'x']), + ('value__contains', KeyTransform('bax', 'value')), + ('value__has_key', KeyTextTransform('foo', 'value')), + ) + # contained_by lookup is not supported on Oracle. + if connection.vendor != 'oracle': + tests += ( + ('value__baz__contained_by', {'a': 'b', 'c': 'd', 'e': 'f'}), + ( + 'value__contained_by', + KeyTransform('x', RawSQL( + self.raw_sql, + ['{"x": {"a": "b", "c": 1, "d": "e"}}'], + )), + ), + ) + for lookup, value in tests: + with self.subTest(lookup=lookup): + self.assertIs(NullableJSONModel.objects.filter( + **{lookup: value}, + ).exists(), True) |
