summaryrefslogtreecommitdiff
path: root/tests/inspectdb
diff options
context:
space:
mode:
authorBaptiste Mispelon <bmispelon@gmail.com>2025-04-28 20:14:09 +0200
committernessita <124304+nessita@users.noreply.github.com>2025-04-29 13:45:05 -0300
commit2722cb61ccae84f593e6d2c28814e3c628743994 (patch)
tree9daabf26667c4b792bf53d9d1c1f14934f1b8659 /tests/inspectdb
parent65bbdbd10b25700d1166b1a698e672a4695281bc (diff)
Used addCleanup() instead of try-finally blocks in inspectdb tests.
Diffstat (limited to 'tests/inspectdb')
-rw-r--r--tests/inspectdb/tests.py305
1 files changed, 143 insertions, 162 deletions
diff --git a/tests/inspectdb/tests.py b/tests/inspectdb/tests.py
index 131bd45ce8..f7a6958b75 100644
--- a/tests/inspectdb/tests.py
+++ b/tests/inspectdb/tests.py
@@ -29,6 +29,17 @@ def special_table_only(table_name):
return table_name.startswith("inspectdb_special")
+def cursor_execute(*queries):
+ """
+ Execute SQL queries using a new cursor on the default database connection.
+ """
+ results = []
+ with connection.cursor() as cursor:
+ for q in queries:
+ results.append(cursor.execute(q))
+ return results
+
+
class InspectDBTestCase(TestCase):
unique_re = re.compile(r".*unique_together = \((.+),\).*")
@@ -415,29 +426,21 @@ class InspectDBTestCase(TestCase):
@skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
def test_unsupported_unique_together(self):
"""Unsupported index types (COALESCE here) are skipped."""
- with connection.cursor() as c:
- c.execute(
- "CREATE UNIQUE INDEX Findex ON %s "
- "(id, people_unique_id, COALESCE(message_id, -1))"
- % PeopleMoreData._meta.db_table
- )
- try:
- out = StringIO()
- call_command(
- "inspectdb",
- table_name_filter=lambda tn: tn.startswith(
- PeopleMoreData._meta.db_table
- ),
- stdout=out,
- )
- output = out.getvalue()
- self.assertIn("# A unique constraint could not be introspected.", output)
- self.assertEqual(
- self.unique_re.findall(output), ["('id', 'people_unique')"]
- )
- finally:
- with connection.cursor() as c:
- c.execute("DROP INDEX Findex")
+ cursor_execute(
+ "CREATE UNIQUE INDEX Findex ON %s "
+ "(id, people_unique_id, COALESCE(message_id, -1))"
+ % PeopleMoreData._meta.db_table
+ )
+ self.addCleanup(cursor_execute, "DROP INDEX Findex")
+ out = StringIO()
+ call_command(
+ "inspectdb",
+ table_name_filter=lambda tn: tn.startswith(PeopleMoreData._meta.db_table),
+ stdout=out,
+ )
+ output = out.getvalue()
+ self.assertIn("# A unique constraint could not be introspected.", output)
+ self.assertEqual(self.unique_re.findall(output), ["('id', 'people_unique')"])
@skipUnless(
connection.vendor == "sqlite",
@@ -492,178 +495,156 @@ class InspectDBTransactionalTests(TransactionTestCase):
def test_include_views(self):
"""inspectdb --include-views creates models for database views."""
- with connection.cursor() as cursor:
- cursor.execute(
- "CREATE VIEW inspectdb_people_view AS "
- "SELECT id, name FROM inspectdb_people"
- )
+ cursor_execute(
+ "CREATE VIEW inspectdb_people_view AS "
+ "SELECT id, name FROM inspectdb_people"
+ )
+ self.addCleanup(cursor_execute, "DROP VIEW inspectdb_people_view")
out = StringIO()
view_model = "class InspectdbPeopleView(models.Model):"
view_managed = "managed = False # Created from a view."
- try:
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_views_only,
- stdout=out,
- )
- no_views_output = out.getvalue()
- self.assertNotIn(view_model, no_views_output)
- self.assertNotIn(view_managed, no_views_output)
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_views_only,
- include_views=True,
- stdout=out,
- )
- with_views_output = out.getvalue()
- self.assertIn(view_model, with_views_output)
- self.assertIn(view_managed, with_views_output)
- finally:
- with connection.cursor() as cursor:
- cursor.execute("DROP VIEW inspectdb_people_view")
+ call_command(
+ "inspectdb",
+ table_name_filter=inspectdb_views_only,
+ stdout=out,
+ )
+ no_views_output = out.getvalue()
+ self.assertNotIn(view_model, no_views_output)
+ self.assertNotIn(view_managed, no_views_output)
+ call_command(
+ "inspectdb",
+ table_name_filter=inspectdb_views_only,
+ include_views=True,
+ stdout=out,
+ )
+ with_views_output = out.getvalue()
+ self.assertIn(view_model, with_views_output)
+ self.assertIn(view_managed, with_views_output)
@skipUnlessDBFeature("can_introspect_materialized_views")
def test_include_materialized_views(self):
"""inspectdb --include-views creates models for materialized views."""
- with connection.cursor() as cursor:
- cursor.execute(
- "CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
- "SELECT id, name FROM inspectdb_people"
- )
+ cursor_execute(
+ "CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
+ "SELECT id, name FROM inspectdb_people"
+ )
+ self.addCleanup(
+ cursor_execute, "DROP MATERIALIZED VIEW inspectdb_people_materialized"
+ )
out = StringIO()
view_model = "class InspectdbPeopleMaterialized(models.Model):"
view_managed = "managed = False # Created from a view."
- try:
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_views_only,
- stdout=out,
- )
- no_views_output = out.getvalue()
- self.assertNotIn(view_model, no_views_output)
- self.assertNotIn(view_managed, no_views_output)
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_views_only,
- include_views=True,
- stdout=out,
- )
- with_views_output = out.getvalue()
- self.assertIn(view_model, with_views_output)
- self.assertIn(view_managed, with_views_output)
- finally:
- with connection.cursor() as cursor:
- cursor.execute("DROP MATERIALIZED VIEW inspectdb_people_materialized")
+ call_command(
+ "inspectdb",
+ table_name_filter=inspectdb_views_only,
+ stdout=out,
+ )
+ no_views_output = out.getvalue()
+ self.assertNotIn(view_model, no_views_output)
+ self.assertNotIn(view_managed, no_views_output)
+ call_command(
+ "inspectdb",
+ table_name_filter=inspectdb_views_only,
+ include_views=True,
+ stdout=out,
+ )
+ with_views_output = out.getvalue()
+ self.assertIn(view_model, with_views_output)
+ self.assertIn(view_managed, with_views_output)
@skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
def test_include_partitions(self):
"""inspectdb --include-partitions creates models for partitions."""
- with connection.cursor() as cursor:
- cursor.execute(
- """\
- CREATE TABLE inspectdb_partition_parent (name text not null)
- PARTITION BY LIST (left(upper(name), 1))
+ cursor_execute(
"""
- )
- cursor.execute(
- """\
- CREATE TABLE inspectdb_partition_child
- PARTITION OF inspectdb_partition_parent
- FOR VALUES IN ('A', 'B', 'C')
+ CREATE TABLE inspectdb_partition_parent (name text not null)
+ PARTITION BY LIST (left(upper(name), 1))
+ """,
"""
- )
+ CREATE TABLE inspectdb_partition_child
+ PARTITION OF inspectdb_partition_parent
+ FOR VALUES IN ('A', 'B', 'C')
+ """,
+ )
+ self.addCleanup(
+ cursor_execute,
+ "DROP TABLE IF EXISTS inspectdb_partition_child",
+ "DROP TABLE IF EXISTS inspectdb_partition_parent",
+ )
out = StringIO()
partition_model_parent = "class InspectdbPartitionParent(models.Model):"
partition_model_child = "class InspectdbPartitionChild(models.Model):"
partition_managed = "managed = False # Created from a partition."
- try:
- call_command(
- "inspectdb", table_name_filter=inspectdb_tables_only, stdout=out
- )
- no_partitions_output = out.getvalue()
- self.assertIn(partition_model_parent, no_partitions_output)
- self.assertNotIn(partition_model_child, no_partitions_output)
- self.assertNotIn(partition_managed, no_partitions_output)
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_tables_only,
- include_partitions=True,
- stdout=out,
- )
- with_partitions_output = out.getvalue()
- self.assertIn(partition_model_parent, with_partitions_output)
- self.assertIn(partition_model_child, with_partitions_output)
- self.assertIn(partition_managed, with_partitions_output)
- finally:
- with connection.cursor() as cursor:
- cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_child")
- cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_parent")
+ call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
+ no_partitions_output = out.getvalue()
+ self.assertIn(partition_model_parent, no_partitions_output)
+ self.assertNotIn(partition_model_child, no_partitions_output)
+ self.assertNotIn(partition_managed, no_partitions_output)
+ call_command(
+ "inspectdb",
+ table_name_filter=inspectdb_tables_only,
+ include_partitions=True,
+ stdout=out,
+ )
+ with_partitions_output = out.getvalue()
+ self.assertIn(partition_model_parent, with_partitions_output)
+ self.assertIn(partition_model_child, with_partitions_output)
+ self.assertIn(partition_managed, with_partitions_output)
@skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
def test_foreign_data_wrapper(self):
- with connection.cursor() as cursor:
- cursor.execute("CREATE EXTENSION IF NOT EXISTS file_fdw")
- cursor.execute(
- "CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw"
- )
- cursor.execute(
- """
- CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
- petal_length real,
- petal_width real,
- sepal_length real,
- sepal_width real
- ) SERVER inspectdb_server OPTIONS (
- program 'echo 1,2,3,4',
- format 'csv'
- )
- """
+ cursor_execute(
+ "CREATE EXTENSION IF NOT EXISTS file_fdw",
+ "CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw",
+ """
+ CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
+ petal_length real,
+ petal_width real,
+ sepal_length real,
+ sepal_width real
+ ) SERVER inspectdb_server OPTIONS (
+ program 'echo 1,2,3,4',
+ format 'csv'
)
+ """,
+ )
+ self.addCleanup(
+ cursor_execute,
+ "DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table",
+ "DROP SERVER IF EXISTS inspectdb_server",
+ "DROP EXTENSION IF EXISTS file_fdw",
+ )
out = StringIO()
foreign_table_model = "class InspectdbIrisForeignTable(models.Model):"
foreign_table_managed = "managed = False"
- try:
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_tables_only,
- stdout=out,
- )
- output = out.getvalue()
- self.assertIn(foreign_table_model, output)
- self.assertIn(foreign_table_managed, output)
- finally:
- with connection.cursor() as cursor:
- cursor.execute(
- "DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table"
- )
- cursor.execute("DROP SERVER IF EXISTS inspectdb_server")
- cursor.execute("DROP EXTENSION IF EXISTS file_fdw")
+ call_command(
+ "inspectdb",
+ table_name_filter=inspectdb_tables_only,
+ stdout=out,
+ )
+ output = out.getvalue()
+ self.assertIn(foreign_table_model, output)
+ self.assertIn(foreign_table_managed, output)
@skipUnlessDBFeature("create_test_table_with_composite_primary_key")
def test_composite_primary_key(self):
table_name = "test_table_composite_pk"
- with connection.cursor() as cursor:
- cursor.execute(
- connection.features.create_test_table_with_composite_primary_key
- )
+ cursor_execute(connection.features.create_test_table_with_composite_primary_key)
+ self.addCleanup(cursor_execute, "DROP TABLE %s" % table_name)
out = StringIO()
if connection.vendor == "sqlite":
field_type = connection.features.introspected_field_types["AutoField"]
else:
field_type = connection.features.introspected_field_types["IntegerField"]
- try:
- call_command("inspectdb", table_name, stdout=out)
- output = out.getvalue()
- self.assertIn(
- "pk = models.CompositePrimaryKey('column_1', 'column_2')",
- output,
- )
- self.assertIn(f"column_1 = models.{field_type}()", output)
- self.assertIn(
- "column_2 = models.%s()"
- % connection.features.introspected_field_types["IntegerField"],
- output,
- )
- finally:
- with connection.cursor() as cursor:
- cursor.execute("DROP TABLE %s" % table_name)
+ call_command("inspectdb", table_name, stdout=out)
+ output = out.getvalue()
+ self.assertIn(
+ "pk = models.CompositePrimaryKey('column_1', 'column_2')",
+ output,
+ )
+ self.assertIn(f"column_1 = models.{field_type}()", output)
+ self.assertIn(
+ "column_2 = models.%s()"
+ % connection.features.introspected_field_types["IntegerField"],
+ output,
+ )