1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
from django.db.models.expressions import ColPairs
from django.db.models.fields import composite
from django.db.models.fields.tuple_lookups import TupleIn, tuple_lookups
from django.db.models.lookups import (
Exact,
GreaterThan,
GreaterThanOrEqual,
In,
IsNull,
LessThan,
LessThanOrEqual,
)
def get_normalized_value(value, lhs):
from django.db.models import Model
if isinstance(value, Model):
if not value._is_pk_set():
raise ValueError("Model instances passed to related filters must be saved.")
value_list = []
sources = composite.unnest(lhs.output_field.path_infos[-1].target_fields)
for source in sources:
while not isinstance(value, source.model) and source.remote_field:
source = source.remote_field.model._meta.get_field(
source.remote_field.field_name
)
try:
value_list.append(getattr(value, source.attname))
except AttributeError:
# A case like
# Restaurant.objects.filter(place=restaurant_instance), where
# place is a OneToOneField and the primary key of Restaurant.
pk = value.pk
return pk if isinstance(pk, tuple) else (pk,)
return tuple(value_list)
if not isinstance(value, tuple):
return (value,)
return value
class RelatedIn(In):
def get_prep_lookup(self):
from django.db.models.sql.query import Query # avoid circular import
if isinstance(self.lhs, ColPairs):
if (
isinstance(self.rhs, Query)
and not self.rhs.has_select_fields
and self.lhs.output_field.related_model is self.rhs.model
):
self.rhs.set_values([f.name for f in self.lhs.sources])
else:
if self.rhs_is_direct_value():
# If we get here, we are dealing with single-column relations.
self.rhs = [get_normalized_value(val, self.lhs)[0] for val in self.rhs]
# We need to run the related field's get_prep_value(). Consider
# case ForeignKey to IntegerField given value 'abc'. The
# ForeignKey itself doesn't have validation for non-integers,
# so we must run validation using the target field.
if hasattr(self.lhs.output_field, "path_infos"):
# Run the target field's get_prep_value. We can safely
# assume there is only one as we don't get to the direct
# value branch otherwise.
target_field = self.lhs.output_field.path_infos[-1].target_fields[
-1
]
self.rhs = [target_field.get_prep_value(v) for v in self.rhs]
elif not getattr(self.rhs, "has_select_fields", True) and not getattr(
self.lhs.field.target_field, "primary_key", False
):
if (
getattr(self.lhs.output_field, "primary_key", False)
and self.lhs.output_field.model == self.rhs.model
):
# A case like
# Restaurant.objects.filter(place__in=restaurant_qs), where
# place is a OneToOneField and the primary key of
# Restaurant.
target_field = self.lhs.field.name
else:
target_field = self.lhs.field.target_field.name
self.rhs.set_values([target_field])
return super().get_prep_lookup()
def as_sql(self, compiler, connection):
if isinstance(self.lhs, ColPairs):
if self.rhs_is_direct_value():
values = [get_normalized_value(value, self.lhs) for value in self.rhs]
lookup = TupleIn(self.lhs, values)
else:
lookup = TupleIn(self.lhs, self.rhs)
return compiler.compile(lookup)
return super().as_sql(compiler, connection)
class RelatedLookupMixin:
def get_prep_lookup(self):
if not isinstance(self.lhs, ColPairs) and not hasattr(
self.rhs, "resolve_expression"
):
# If we get here, we are dealing with single-column relations.
self.rhs = get_normalized_value(self.rhs, self.lhs)[0]
# We need to run the related field's get_prep_value(). Consider
# case ForeignKey to IntegerField given value 'abc'. The ForeignKey
# itself doesn't have validation for non-integers, so we must run
# validation using the target field.
if self.prepare_rhs and hasattr(self.lhs.output_field, "path_infos"):
# Get the target field. We can safely assume there is only one
# as we don't get to the direct value branch otherwise.
target_field = self.lhs.output_field.path_infos[-1].target_fields[-1]
self.rhs = target_field.get_prep_value(self.rhs)
return super().get_prep_lookup()
def as_sql(self, compiler, connection):
if isinstance(self.lhs, ColPairs):
if not self.rhs_is_direct_value():
raise ValueError(
f"'{self.lookup_name}' doesn't support multi-column subqueries."
)
self.rhs = get_normalized_value(self.rhs, self.lhs)
lookup_class = tuple_lookups[self.lookup_name]
lookup = lookup_class(self.lhs, self.rhs)
return compiler.compile(lookup)
return super().as_sql(compiler, connection)
class RelatedExact(RelatedLookupMixin, Exact):
pass
class RelatedLessThan(RelatedLookupMixin, LessThan):
pass
class RelatedGreaterThan(RelatedLookupMixin, GreaterThan):
pass
class RelatedGreaterThanOrEqual(RelatedLookupMixin, GreaterThanOrEqual):
pass
class RelatedLessThanOrEqual(RelatedLookupMixin, LessThanOrEqual):
pass
class RelatedIsNull(RelatedLookupMixin, IsNull):
pass
|