diff options
| author | wookkl <wjddnr315@gmail.com> | 2025-03-12 00:53:04 +0900 |
|---|---|---|
| committer | Sarah Boyce <42296566+sarahboyce@users.noreply.github.com> | 2025-03-12 09:22:44 +0100 |
| commit | 2ae3044d9d4dfb8371055513e440e0384f211963 (patch) | |
| tree | 771c0a6340bd0eb8a05c608ae2919652b16ce876 /tests | |
| parent | 0ebea6e5c07485a36862e9b6e2be18d1694ad2c5 (diff) | |
Fixed #35945 -- Added async interface to Paginator.
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/pagination/custom.py | 19 | ||||
| -rw-r--r-- | tests/pagination/tests.py | 500 |
2 files changed, 462 insertions, 57 deletions
diff --git a/tests/pagination/custom.py b/tests/pagination/custom.py index ea04083576..e1ffa905f1 100644 --- a/tests/pagination/custom.py +++ b/tests/pagination/custom.py @@ -1,4 +1,4 @@ -from django.core.paginator import Page, Paginator +from django.core.paginator import AsyncPage, AsyncPaginator, Page, Paginator class ValidAdjacentNumsPage(Page): @@ -16,3 +16,20 @@ class ValidAdjacentNumsPage(Page): class ValidAdjacentNumsPaginator(Paginator): def _get_page(self, *args, **kwargs): return ValidAdjacentNumsPage(*args, **kwargs) + + +class AsyncValidAdjacentNumsPage(AsyncPage): + async def anext_page_number(self): + if not await self.ahas_next(): + return None + return await super().anext_page_number() + + async def aprevious_page_number(self): + if not await self.ahas_previous(): + return None + return await super().aprevious_page_number() + + +class AsyncValidAdjacentNumsPaginator(AsyncPaginator): + def _get_page(self, *args, **kwargs): + return AsyncValidAdjacentNumsPage(*args, **kwargs) diff --git a/tests/pagination/tests.py b/tests/pagination/tests.py index cf7ec61d3b..ab2329f26f 100644 --- a/tests/pagination/tests.py +++ b/tests/pagination/tests.py @@ -1,9 +1,12 @@ import collections.abc +import inspect import unittest.mock import warnings from datetime import datetime from django.core.paginator import ( + AsyncPaginator, + BasePaginator, EmptyPage, InvalidPage, PageNotAnInteger, @@ -12,7 +15,7 @@ from django.core.paginator import ( ) from django.test import SimpleTestCase, TestCase -from .custom import ValidAdjacentNumsPaginator +from .custom import AsyncValidAdjacentNumsPaginator, ValidAdjacentNumsPaginator from .models import Article @@ -32,6 +35,13 @@ class PaginationTests(SimpleTestCase): self.check_attribute("num_pages", paginator, num_pages, params) self.check_attribute("page_range", paginator, page_range, params, coerce=list) + async def check_paginator_async(self, params, output): + """See check_paginator.""" + count, num_pages, page_range = output + paginator = AsyncPaginator(*params) + await self.check_attribute_async("acount", paginator, count, params) + await self.check_attribute_async("anum_pages", paginator, num_pages, params) + def check_attribute(self, name, paginator, expected, params, coerce=None): """ Helper method that checks a single attribute and gives a nice error @@ -47,14 +57,21 @@ class PaginationTests(SimpleTestCase): % (name, expected, got, params), ) - def test_paginator(self): - """ - Tests the paginator attributes using varying inputs. - """ + async def check_attribute_async(self, name, paginator, expected, params): + """See check_attribute.""" + got = getattr(paginator, name) + self.assertEqual( + expected, + await got(), + "For '%s', expected %s but got %s. Paginator parameters were: %s" + % (name, expected, got, params), + ) + + def get_test_cases_for_test_paginator(self): nine = [1, 2, 3, 4, 5, 6, 7, 8, 9] ten = nine + [10] eleven = ten + [11] - tests = ( + return ( # Each item is 2-tuple: # First tuple is Paginator parameters - object_list, per_page, # orphans, and allow_empty_first_page. @@ -111,9 +128,17 @@ class PaginationTests(SimpleTestCase): ((ten, 4, "1", False), (10, 3, [1, 2, 3])), ((ten, 4, "1", False), (10, 3, [1, 2, 3])), ) + + def test_paginator(self): + tests = self.get_test_cases_for_test_paginator() for params, output in tests: self.check_paginator(params, output) + async def test_paginator_async(self): + tests = self.get_test_cases_for_test_paginator() + for params, output in tests: + await self.check_paginator_async(params, output) + def test_invalid_page_number(self): """ Invalid page numbers result in the correct exception being raised. @@ -128,6 +153,12 @@ class PaginationTests(SimpleTestCase): with self.assertRaises(PageNotAnInteger): paginator.validate_number(1.2) + async def test_invalid_apage_number_async(self): + """See test_invalid_page_number.""" + paginator = AsyncPaginator([1, 2, 3], 2) + with self.assertRaises(InvalidPage): + await paginator.apage(3) + def test_error_messages(self): error_messages = { "invalid_page": "Wrong page number", @@ -186,6 +217,27 @@ class PaginationTests(SimpleTestCase): self.assertEqual(5, paginator.num_pages) self.assertEqual([1, 2, 3, 4, 5], list(paginator.page_range)) + async def test_paginate_misc_classes_async(self): + class CountContainer: + async def acount(self): + return 42 + + # AsyncPaginator can be passed other objects with an acount() method. + paginator = AsyncPaginator(CountContainer(), 10) + self.assertEqual(42, await paginator.acount()) + self.assertEqual(5, await paginator.anum_pages()) + self.assertEqual([1, 2, 3, 4, 5], list(await paginator.apage_range())) + + # AsyncPaginator can be passed other objects that implement __len__. + class LenContainer: + def __len__(self): + return 42 + + paginator = AsyncPaginator(LenContainer(), 10) + self.assertEqual(42, await paginator.acount()) + self.assertEqual(5, await paginator.anum_pages()) + self.assertEqual([1, 2, 3, 4, 5], list(await paginator.apage_range())) + def test_count_does_not_silence_attribute_error(self): class AttributeErrorContainer: def count(self): @@ -194,6 +246,14 @@ class PaginationTests(SimpleTestCase): with self.assertRaisesMessage(AttributeError, "abc"): Paginator(AttributeErrorContainer(), 10).count + async def test_acount_does_not_silence_attribute_error_async(self): + class AttributeErrorContainer: + async def acount(self): + raise AttributeError("abc") + + with self.assertRaisesMessage(AttributeError, "abc"): + await AsyncPaginator(AttributeErrorContainer(), 10).acount() + def test_count_does_not_silence_type_error(self): class TypeErrorContainer: def count(self): @@ -202,6 +262,14 @@ class PaginationTests(SimpleTestCase): with self.assertRaisesMessage(TypeError, "abc"): Paginator(TypeErrorContainer(), 10).count + async def test_acount_does_not_silence_type_error_async(self): + class TypeErrorContainer: + async def acount(self): + raise TypeError("abc") + + with self.assertRaisesMessage(TypeError, "abc"): + await AsyncPaginator(TypeErrorContainer(), 10).acount() + def check_indexes(self, params, page_num, indexes): """ Helper method that instantiates a Paginator object from the passed @@ -227,12 +295,30 @@ class PaginationTests(SimpleTestCase): msg % ("end index", page_num, end, page.end_index(), params), ) - def test_page_indexes(self): - """ - Paginator pages have the correct start and end indexes. - """ + async def check_indexes_async(self, params, page_num, indexes): + """See check_indexes.""" + paginator = AsyncPaginator(*params) + if page_num == "first": + page_num = 1 + elif page_num == "last": + page_num = await paginator.anum_pages() + page = await paginator.apage(page_num) + start, end = indexes + msg = "For %s of page %s, expected %s but got %s. Paginator parameters were: %s" + self.assertEqual( + start, + await page.astart_index(), + msg % ("start index", page_num, start, await page.astart_index(), params), + ) + self.assertEqual( + end, + await page.aend_index(), + msg % ("end index", page_num, end, await page.aend_index(), params), + ) + + def get_test_cases_for_test_page_indexes(self): ten = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - tests = ( + return ( # Each item is 3-tuple: # First tuple is Paginator parameters - object_list, per_page, # orphans, and allow_empty_first_page. @@ -265,6 +351,12 @@ class PaginationTests(SimpleTestCase): (([], 4, 1, True), (0, 0), (0, 0)), (([], 4, 2, True), (0, 0), (0, 0)), ) + + def test_page_indexes(self): + """ + Paginator pages have the correct start and end indexes. + """ + tests = self.get_test_cases_for_test_page_indexes() for params, first, last in tests: self.check_indexes(params, "first", first) self.check_indexes(params, "last", last) @@ -277,6 +369,21 @@ class PaginationTests(SimpleTestCase): with self.assertRaises(EmptyPage): self.check_indexes(([], 4, 2, False), 1, None) + async def test_page_indexes_async(self): + """See test_page_indexes""" + tests = self.get_test_cases_for_test_page_indexes() + for params, first, last in tests: + await self.check_indexes_async(params, "first", first) + await self.check_indexes_async(params, "last", last) + + # When no items and no empty first page, we should get EmptyPage error. + with self.assertRaises(EmptyPage): + await self.check_indexes_async(([], 4, 0, False), 1, None) + with self.assertRaises(EmptyPage): + await self.check_indexes_async(([], 4, 1, False), 1, None) + with self.assertRaises(EmptyPage): + await self.check_indexes_async(([], 4, 2, False), 1, None) + def test_page_sequence(self): """ A paginator page acts like a standard sequence. @@ -289,6 +396,16 @@ class PaginationTests(SimpleTestCase): self.assertEqual("".join(page2), "fghijk") self.assertEqual("".join(reversed(page2)), "kjihgf") + async def test_page_sequence_async(self): + eleven = "abcdefghijk" + page2 = await AsyncPaginator(eleven, per_page=5, orphans=1).apage(2) + await page2.aget_object_list() + self.assertEqual(len(page2), 6) + self.assertIn("k", page2) + self.assertNotIn("a", page2) + self.assertEqual("".join(page2), "fghijk") + self.assertEqual("".join(reversed(page2)), "kjihgf") + def test_get_page_hook(self): """ A Paginator subclass can use the ``_get_page`` hook to @@ -303,6 +420,20 @@ class PaginationTests(SimpleTestCase): self.assertEqual(page2.previous_page_number(), 1) self.assertIsNone(page2.next_page_number()) + async def test_get_page_hook_async(self): + """ + An AsyncPaginator subclass can use the ``_get_page`` hook to + return an alternative to the standard AsyncPage class. + """ + eleven = "abcdefghijk" + paginator = AsyncValidAdjacentNumsPaginator(eleven, per_page=6) + page1 = await paginator.apage(1) + page2 = await paginator.apage(2) + self.assertIsNone(await page1.aprevious_page_number()) + self.assertEqual(await page1.anext_page_number(), 2) + self.assertEqual(await page2.aprevious_page_number(), 1) + self.assertIsNone(await page2.anext_page_number()) + def test_page_range_iterator(self): """ Paginator.page_range should be an iterator. @@ -323,6 +454,20 @@ class PaginationTests(SimpleTestCase): # Non-integer page returns the first page. self.assertEqual(paginator.get_page(None).number, 1) + async def test_aget_page_async(self): + """ + AsyncPaginator.aget_page() returns a valid page even with invalid page + arguments. + """ + paginator = AsyncPaginator([1, 2, 3], 2) + page = await paginator.aget_page(1) + self.assertEqual(page.number, 1) + self.assertEqual(page.object_list, [1, 2]) + # An empty page returns the last page. + self.assertEqual((await paginator.aget_page(3)).number, 2) + # Non-integer page returns the first page. + self.assertEqual((await paginator.aget_page(None)).number, 1) + def test_get_page_empty_object_list(self): """Paginator.get_page() with an empty object_list.""" paginator = Paginator([], 2) @@ -332,6 +477,15 @@ class PaginationTests(SimpleTestCase): # Non-integer page returns the first page. self.assertEqual(paginator.get_page(None).number, 1) + async def test_aget_page_empty_object_list_async(self): + """AsyncPaginator.aget_page() with an empty object_list.""" + paginator = AsyncPaginator([], 2) + # An empty page returns the last page. + self.assertEqual((await paginator.aget_page(1)).number, 1) + self.assertEqual((await paginator.aget_page(2)).number, 1) + # Non-integer page returns the first page. + self.assertEqual((await paginator.aget_page(None)).number, 1) + def test_get_page_empty_object_list_and_allow_empty_first_page_false(self): """ Paginator.get_page() raises EmptyPage if allow_empty_first_page=False @@ -341,6 +495,17 @@ class PaginationTests(SimpleTestCase): with self.assertRaises(EmptyPage): paginator.get_page(1) + async def test_aget_page_empty_obj_list_and_allow_empty_first_page_false_async( + self, + ): + """ + AsyncPaginator.aget_page() raises EmptyPage if allow_empty_first_page=False + and object_list is empty. + """ + paginator = AsyncPaginator([], 2, allow_empty_first_page=False) + with self.assertRaises(EmptyPage): + await paginator.aget_page(1) + def test_paginator_iteration(self): paginator = Paginator([1, 2, 3], 2) page_iterator = iter(paginator) @@ -353,6 +518,66 @@ class PaginationTests(SimpleTestCase): ["<Page 1 of 2>", "<Page 2 of 2>"], ) + async def test_paginator_iteration_async(self): + paginator = AsyncPaginator([1, 2, 3], 2) + page_iterator = aiter(paginator) + for page, expected in enumerate(([1, 2], [3]), start=1): + with self.subTest(page=page): + async_page = await anext(page_iterator) + self.assertEqual(expected, [obj async for obj in async_page]) + self.assertEqual( + [str(page) async for page in aiter(paginator)], + ["<Async Page 1>", "<Async Page 2>"], + ) + + def get_test_cases_for_test_get_elided_page_range(self): + ELLIPSIS = Paginator.ELLIPSIS + return [ + # on_each_side=2, on_ends=1 + (1, 2, 1, [1, 2, 3, ELLIPSIS, 50]), + (4, 2, 1, [1, 2, 3, 4, 5, 6, ELLIPSIS, 50]), + (5, 2, 1, [1, 2, 3, 4, 5, 6, 7, ELLIPSIS, 50]), + (6, 2, 1, [1, ELLIPSIS, 4, 5, 6, 7, 8, ELLIPSIS, 50]), + (45, 2, 1, [1, ELLIPSIS, 43, 44, 45, 46, 47, ELLIPSIS, 50]), + (46, 2, 1, [1, ELLIPSIS, 44, 45, 46, 47, 48, 49, 50]), + (47, 2, 1, [1, ELLIPSIS, 45, 46, 47, 48, 49, 50]), + (50, 2, 1, [1, ELLIPSIS, 48, 49, 50]), + # on_each_side=1, on_ends=3 + (1, 1, 3, [1, 2, ELLIPSIS, 48, 49, 50]), + (5, 1, 3, [1, 2, 3, 4, 5, 6, ELLIPSIS, 48, 49, 50]), + (6, 1, 3, [1, 2, 3, 4, 5, 6, 7, ELLIPSIS, 48, 49, 50]), + (7, 1, 3, [1, 2, 3, ELLIPSIS, 6, 7, 8, ELLIPSIS, 48, 49, 50]), + (44, 1, 3, [1, 2, 3, ELLIPSIS, 43, 44, 45, ELLIPSIS, 48, 49, 50]), + (45, 1, 3, [1, 2, 3, ELLIPSIS, 44, 45, 46, 47, 48, 49, 50]), + (46, 1, 3, [1, 2, 3, ELLIPSIS, 45, 46, 47, 48, 49, 50]), + (50, 1, 3, [1, 2, 3, ELLIPSIS, 49, 50]), + # on_each_side=4, on_ends=0 + (1, 4, 0, [1, 2, 3, 4, 5, ELLIPSIS]), + (5, 4, 0, [1, 2, 3, 4, 5, 6, 7, 8, 9, ELLIPSIS]), + (6, 4, 0, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ELLIPSIS]), + (7, 4, 0, [ELLIPSIS, 3, 4, 5, 6, 7, 8, 9, 10, 11, ELLIPSIS]), + (44, 4, 0, [ELLIPSIS, 40, 41, 42, 43, 44, 45, 46, 47, 48, ELLIPSIS]), + (45, 4, 0, [ELLIPSIS, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]), + (46, 4, 0, [ELLIPSIS, 42, 43, 44, 45, 46, 47, 48, 49, 50]), + (50, 4, 0, [ELLIPSIS, 46, 47, 48, 49, 50]), + # on_each_side=0, on_ends=1 + (1, 0, 1, [1, ELLIPSIS, 50]), + (2, 0, 1, [1, 2, ELLIPSIS, 50]), + (3, 0, 1, [1, 2, 3, ELLIPSIS, 50]), + (4, 0, 1, [1, ELLIPSIS, 4, ELLIPSIS, 50]), + (47, 0, 1, [1, ELLIPSIS, 47, ELLIPSIS, 50]), + (48, 0, 1, [1, ELLIPSIS, 48, 49, 50]), + (49, 0, 1, [1, ELLIPSIS, 49, 50]), + (50, 0, 1, [1, ELLIPSIS, 50]), + # on_each_side=0, on_ends=0 + (1, 0, 0, [1, ELLIPSIS]), + (2, 0, 0, [1, 2, ELLIPSIS]), + (3, 0, 0, [ELLIPSIS, 3, ELLIPSIS]), + (48, 0, 0, [ELLIPSIS, 48, ELLIPSIS]), + (49, 0, 0, [ELLIPSIS, 49, 50]), + (50, 0, 0, [ELLIPSIS, 50]), + ] + def test_get_elided_page_range(self): # Paginator.validate_number() must be called: paginator = Paginator([1, 2, 3], 2) @@ -426,51 +651,7 @@ class PaginationTests(SimpleTestCase): self.assertIn(ELLIPSIS, page_range) # Range should be elided if enough pages when using custom arguments: - tests = [ - # on_each_side=2, on_ends=1 - (1, 2, 1, [1, 2, 3, ELLIPSIS, 50]), - (4, 2, 1, [1, 2, 3, 4, 5, 6, ELLIPSIS, 50]), - (5, 2, 1, [1, 2, 3, 4, 5, 6, 7, ELLIPSIS, 50]), - (6, 2, 1, [1, ELLIPSIS, 4, 5, 6, 7, 8, ELLIPSIS, 50]), - (45, 2, 1, [1, ELLIPSIS, 43, 44, 45, 46, 47, ELLIPSIS, 50]), - (46, 2, 1, [1, ELLIPSIS, 44, 45, 46, 47, 48, 49, 50]), - (47, 2, 1, [1, ELLIPSIS, 45, 46, 47, 48, 49, 50]), - (50, 2, 1, [1, ELLIPSIS, 48, 49, 50]), - # on_each_side=1, on_ends=3 - (1, 1, 3, [1, 2, ELLIPSIS, 48, 49, 50]), - (5, 1, 3, [1, 2, 3, 4, 5, 6, ELLIPSIS, 48, 49, 50]), - (6, 1, 3, [1, 2, 3, 4, 5, 6, 7, ELLIPSIS, 48, 49, 50]), - (7, 1, 3, [1, 2, 3, ELLIPSIS, 6, 7, 8, ELLIPSIS, 48, 49, 50]), - (44, 1, 3, [1, 2, 3, ELLIPSIS, 43, 44, 45, ELLIPSIS, 48, 49, 50]), - (45, 1, 3, [1, 2, 3, ELLIPSIS, 44, 45, 46, 47, 48, 49, 50]), - (46, 1, 3, [1, 2, 3, ELLIPSIS, 45, 46, 47, 48, 49, 50]), - (50, 1, 3, [1, 2, 3, ELLIPSIS, 49, 50]), - # on_each_side=4, on_ends=0 - (1, 4, 0, [1, 2, 3, 4, 5, ELLIPSIS]), - (5, 4, 0, [1, 2, 3, 4, 5, 6, 7, 8, 9, ELLIPSIS]), - (6, 4, 0, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ELLIPSIS]), - (7, 4, 0, [ELLIPSIS, 3, 4, 5, 6, 7, 8, 9, 10, 11, ELLIPSIS]), - (44, 4, 0, [ELLIPSIS, 40, 41, 42, 43, 44, 45, 46, 47, 48, ELLIPSIS]), - (45, 4, 0, [ELLIPSIS, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]), - (46, 4, 0, [ELLIPSIS, 42, 43, 44, 45, 46, 47, 48, 49, 50]), - (50, 4, 0, [ELLIPSIS, 46, 47, 48, 49, 50]), - # on_each_side=0, on_ends=1 - (1, 0, 1, [1, ELLIPSIS, 50]), - (2, 0, 1, [1, 2, ELLIPSIS, 50]), - (3, 0, 1, [1, 2, 3, ELLIPSIS, 50]), - (4, 0, 1, [1, ELLIPSIS, 4, ELLIPSIS, 50]), - (47, 0, 1, [1, ELLIPSIS, 47, ELLIPSIS, 50]), - (48, 0, 1, [1, ELLIPSIS, 48, 49, 50]), - (49, 0, 1, [1, ELLIPSIS, 49, 50]), - (50, 0, 1, [1, ELLIPSIS, 50]), - # on_each_side=0, on_ends=0 - (1, 0, 0, [1, ELLIPSIS]), - (2, 0, 0, [1, 2, ELLIPSIS]), - (3, 0, 0, [ELLIPSIS, 3, ELLIPSIS]), - (48, 0, 0, [ELLIPSIS, 48, ELLIPSIS]), - (49, 0, 0, [ELLIPSIS, 49, 50]), - (50, 0, 0, [ELLIPSIS, 50]), - ] + tests = self.get_test_cases_for_test_get_elided_page_range() paginator = Paginator(range(5000), 100) for number, on_each_side, on_ends, expected in tests: with self.subTest( @@ -484,6 +665,94 @@ class PaginationTests(SimpleTestCase): self.assertIsInstance(page_range, collections.abc.Generator) self.assertEqual(list(page_range), expected) + async def test_aget_elided_page_range_async(self): + # AsyncPaginator.avalidate_number() must be called: + paginator = AsyncPaginator([1, 2, 3], 2) + with unittest.mock.patch.object(paginator, "avalidate_number") as mock: + mock.assert_not_called() + [p async for p in paginator.aget_elided_page_range(2)] + mock.assert_called_with(2) + + ELLIPSIS = Paginator.ELLIPSIS + + # Range is not elided if not enough pages when using default arguments: + paginator = AsyncPaginator(range(10 * 100), 100) + page_range = paginator.aget_elided_page_range(1) + self.assertIsInstance(page_range, collections.abc.AsyncGenerator) + self.assertNotIn(ELLIPSIS, [p async for p in page_range]) + paginator = AsyncPaginator(range(10 * 100 + 1), 100) + self.assertIsInstance(page_range, collections.abc.AsyncGenerator) + page_range = paginator.aget_elided_page_range(1) + self.assertIn(ELLIPSIS, [p async for p in page_range]) + + # Range should be elided if enough pages when using default arguments: + tests = [ + # on_each_side=3, on_ends=2 + (1, [1, 2, 3, 4, ELLIPSIS, 49, 50]), + (6, [1, 2, 3, 4, 5, 6, 7, 8, 9, ELLIPSIS, 49, 50]), + (7, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ELLIPSIS, 49, 50]), + (8, [1, 2, ELLIPSIS, 5, 6, 7, 8, 9, 10, 11, ELLIPSIS, 49, 50]), + (43, [1, 2, ELLIPSIS, 40, 41, 42, 43, 44, 45, 46, ELLIPSIS, 49, 50]), + (44, [1, 2, ELLIPSIS, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]), + (45, [1, 2, ELLIPSIS, 42, 43, 44, 45, 46, 47, 48, 49, 50]), + (50, [1, 2, ELLIPSIS, 47, 48, 49, 50]), + ] + paginator = AsyncPaginator(range(5000), 100) + for number, expected in tests: + with self.subTest(number=number): + page_range = paginator.aget_elided_page_range(number) + self.assertIsInstance(page_range, collections.abc.AsyncGenerator) + self.assertEqual([p async for p in page_range], expected) + + # Range is not elided if not enough pages when using custom arguments: + tests = [ + (6, 2, 1, 1), + (8, 1, 3, 1), + (8, 4, 0, 1), + (4, 1, 1, 1), + # When on_each_side and on_ends are both <= 1 but not both == 1 it + # is a special case where the range is not elided until an extra + # page is added. + (2, 0, 1, 2), + (2, 1, 0, 2), + (1, 0, 0, 2), + ] + for pages, on_each_side, on_ends, elided_after in tests: + for offset in range(elided_after + 1): + with self.subTest( + pages=pages, + offset=elided_after, + on_each_side=on_each_side, + on_ends=on_ends, + ): + paginator = AsyncPaginator(range((pages + offset) * 100), 100) + page_range = paginator.aget_elided_page_range( + 1, + on_each_side=on_each_side, + on_ends=on_ends, + ) + self.assertIsInstance(page_range, collections.abc.AsyncGenerator) + page_list = [p async for p in page_range] + if offset < elided_after: + self.assertNotIn(ELLIPSIS, page_list) + else: + self.assertIn(ELLIPSIS, page_list) + + # Range should be elided if enough pages when using custom arguments: + tests = self.get_test_cases_for_test_get_elided_page_range() + paginator = AsyncPaginator(range(5000), 100) + for number, on_each_side, on_ends, expected in tests: + with self.subTest( + number=number, on_each_side=on_each_side, on_ends=on_ends + ): + page_range = paginator.aget_elided_page_range( + number, + on_each_side=on_each_side, + on_ends=on_ends, + ) + self.assertIsInstance(page_range, collections.abc.AsyncGenerator) + self.assertEqual([p async for p in page_range], expected) + class ModelPaginationTests(TestCase): """ @@ -513,6 +782,21 @@ class ModelPaginationTests(TestCase): self.assertEqual(1, p.start_index()) self.assertEqual(5, p.end_index()) + async def test_first_page_async(self): + paginator = AsyncPaginator(Article.objects.order_by("id"), 5) + p = await paginator.apage(1) + self.assertEqual("<Async Page 1>", str(p)) + object_list = await p.aget_object_list() + self.assertSequenceEqual(object_list, self.articles[:5]) + self.assertTrue(await p.ahas_next()) + self.assertFalse(await p.ahas_previous()) + self.assertTrue(await p.ahas_other_pages()) + self.assertEqual(2, await p.anext_page_number()) + with self.assertRaises(InvalidPage): + await p.aprevious_page_number() + self.assertEqual(1, await p.astart_index()) + self.assertEqual(5, await p.aend_index()) + def test_last_page(self): paginator = Paginator(Article.objects.order_by("id"), 5) p = paginator.page(2) @@ -527,6 +811,21 @@ class ModelPaginationTests(TestCase): self.assertEqual(6, p.start_index()) self.assertEqual(9, p.end_index()) + async def test_last_page_async(self): + paginator = AsyncPaginator(Article.objects.order_by("id"), 5) + p = await paginator.apage(2) + self.assertEqual("<Async Page 2>", str(p)) + object_list = await p.aget_object_list() + self.assertSequenceEqual(object_list, self.articles[5:]) + self.assertFalse(await p.ahas_next()) + self.assertTrue(await p.ahas_previous()) + self.assertTrue(await p.ahas_other_pages()) + with self.assertRaises(InvalidPage): + await p.anext_page_number() + self.assertEqual(1, await p.aprevious_page_number()) + self.assertEqual(6, await p.astart_index()) + self.assertEqual(9, await p.aend_index()) + def test_page_getitem(self): """ Tests proper behavior of a paginator page __getitem__ (queryset @@ -551,6 +850,24 @@ class ModelPaginationTests(TestCase): # After __getitem__ is called, object_list is a list self.assertIsInstance(p.object_list, list) + async def test_page_getitem_async(self): + paginator = AsyncPaginator(Article.objects.order_by("id"), 5) + p = await paginator.apage(1) + + msg = "AsyncPage indices must be integers or slices, not str." + with self.assertRaisesMessage(TypeError, msg): + p["has_previous"] + + self.assertIsNone(p.object_list._result_cache) + + self.assertNotIsInstance(p.object_list, list) + + await p.aget_object_list() + + self.assertEqual(p[0], self.articles[0]) + self.assertSequenceEqual(p[slice(2)], self.articles[:2]) + self.assertIsInstance(p.object_list, list) + def test_paginating_unordered_queryset_raises_warning(self): msg = ( "Pagination may yield inconsistent results with an unordered " @@ -562,11 +879,27 @@ class ModelPaginationTests(TestCase): # is appropriate). self.assertEqual(cm.filename, __file__) + async def test_paginating_unordered_queryset_raises_warning_async(self): + msg = ( + "Pagination may yield inconsistent results with an unordered " + "object_list: <class 'pagination.models.Article'> QuerySet." + ) + with self.assertWarnsMessage(UnorderedObjectListWarning, msg) as cm: + AsyncPaginator(Article.objects.all(), 5) + # The warning points at the BasePaginator caller. + # The reason is that the UnorderedObjectListWarning occurs in BasePaginator. + self.assertEqual(cm.filename, inspect.getfile(BasePaginator)) + def test_paginating_empty_queryset_does_not_warn(self): with warnings.catch_warnings(record=True) as recorded: Paginator(Article.objects.none(), 5) self.assertEqual(len(recorded), 0) + async def test_paginating_empty_queryset_does_not_warn_async(self): + with warnings.catch_warnings(record=True) as recorded: + AsyncPaginator(Article.objects.none(), 5) + self.assertEqual(len(recorded), 0) + def test_paginating_unordered_object_list_raises_warning(self): """ Unordered object list warning with an object that has an ordered @@ -583,3 +916,58 @@ class ModelPaginationTests(TestCase): ) with self.assertWarnsMessage(UnorderedObjectListWarning, msg): Paginator(object_list, 5) + + async def test_paginating_unordered_object_list_raises_warning_async(self): + """ + See test_paginating_unordered_object_list_raises_warning. + """ + + class ObjectList: + ordered = False + + object_list = ObjectList() + msg = ( + "Pagination may yield inconsistent results with an unordered " + "object_list: {!r}.".format(object_list) + ) + with self.assertWarnsMessage(UnorderedObjectListWarning, msg): + AsyncPaginator(object_list, 5) + + async def test_async_page_object_list_raises_type_error_before_await(self): + paginator = AsyncPaginator(Article.objects.order_by("id"), 5) + p = await paginator.apage(1) + + with self.subTest(func="len"): + msg = "AsyncPage.aget_object_list() must be awaited before calling len()." + with self.assertRaisesMessage(TypeError, msg): + len(p) + + with self.subTest(func="reversed"): + msg = ( + "AsyncPage.aget_object_list() must be awaited before calling " + "reversed()." + ) + with self.assertRaisesMessage(TypeError, msg): + reversed(p) + + with self.subTest(func="index"): + msg = "AsyncPage.aget_object_list() must be awaited before using indexing." + with self.assertRaisesMessage(TypeError, msg): + p[0] + + async def test_async_page_aiteration(self): + paginator = AsyncPaginator(Article.objects.order_by("id"), 5) + p = await paginator.apage(1) + object_list = [obj async for obj in p] + self.assertEqual(len(object_list), 5) + + async def test_aget_object_list(self): + paginator = AsyncPaginator(Article.objects.order_by("id"), 5) + p = await paginator.apage(1) + + # object_list queryset is converted to list. + first_called_objs = await p.aget_object_list() + self.assertIsInstance(first_called_objs, list) + # It returns the same list that was converted on the first call. + second_called_objs = await p.aget_object_list() + self.assertEqual(id(first_called_objs), id(second_called_objs)) |
