1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
|
"""
Creates permissions for all installed apps that need permissions, and renames
them on model renames.
"""
import getpass
import sys
import unicodedata
from django.apps import apps as global_apps
from django.contrib.auth import get_permission_codename
from django.contrib.contenttypes.management import create_contenttypes
from django.core import exceptions
from django.core.management.color import color_style
from django.db import DEFAULT_DB_ALIAS, migrations, router, transaction
def _get_all_permissions(opts):
"""
Return (codename, name) for all permissions in the given opts.
"""
return [*_get_builtin_permissions(opts), *opts.permissions]
def _get_builtin_permissions(opts):
"""
Return (codename, name) for all autogenerated permissions.
By default, this is ('add', 'change', 'delete', 'view')
"""
perms = []
for action in opts.default_permissions:
perms.append(
(
get_permission_codename(action, opts),
"Can %s %s" % (action, opts.verbose_name_raw),
)
)
return perms
def create_permissions(
app_config,
verbosity=2,
interactive=True,
using=DEFAULT_DB_ALIAS,
apps=global_apps,
**kwargs,
):
if not app_config.models_module:
return
try:
Permission = apps.get_model("auth", "Permission")
except LookupError:
return
if not router.allow_migrate_model(using, Permission):
return
# Ensure that contenttypes are created for this app. Needed if
# 'django.contrib.auth' is in INSTALLED_APPS before
# 'django.contrib.contenttypes'.
create_contenttypes(
app_config,
verbosity=verbosity,
interactive=interactive,
using=using,
apps=apps,
**kwargs,
)
app_label = app_config.label
try:
app_config = apps.get_app_config(app_label)
ContentType = apps.get_model("contenttypes", "ContentType")
except LookupError:
return
models = list(app_config.get_models())
# Grab all the ContentTypes.
ctypes = ContentType.objects.db_manager(using).get_for_models(
*models, for_concrete_models=False
)
# Find all the Permissions that have a content_type for a model we're
# looking for. We don't need to check for codenames since we already have
# a list of the ones we're going to create.
all_perms = set(
Permission.objects.using(using)
.filter(
content_type__in=set(ctypes.values()),
)
.values_list("content_type", "codename")
)
perms = []
for model in models:
ctype = ctypes[model]
for codename, name in _get_all_permissions(model._meta):
if (ctype.pk, codename) not in all_perms:
permission = Permission()
permission._state.db = using
permission.codename = codename
permission.name = name
permission.content_type = ctype
perms.append(permission)
Permission.objects.using(using).bulk_create(perms)
if verbosity >= 2:
for perm in perms:
print("Adding permission '%s'" % perm)
def _get_permission_metadata(apps, app_label, model_name):
try:
model = apps.get_model(app_label, model_name)
except LookupError:
# Model does not exist in this migration state, e.g. zero.
Permission = apps.get_model("auth", "Permission")
return Permission._meta.default_permissions, model_name
return (
model._meta.default_permissions,
model._meta.verbose_name_raw,
)
def rename_permissions_after_model_rename(
app_config,
verbosity=2,
plan=None,
using=DEFAULT_DB_ALIAS,
apps=global_apps,
stdout=sys.stdout,
**kwargs,
):
if not app_config.models_module:
return
# This handler is connected to the global post_migrate signal, which is
# emitted for *all* apps — including test configurations where
# django.contrib.auth is NOT installed.
try:
Permission = apps.get_model("auth", "Permission")
except LookupError:
return
if not router.allow_migrate_model(using, Permission):
return
db = using or router.db_for_write(Permission)
app_label = app_config.label
# Collect (from_model, to_model) pairs
renames = [
(op.new_name, op.old_name) if backward else (op.old_name, op.new_name)
for migration, backward in (plan or [])
for op in migration.operations
if isinstance(op, migrations.RenameModel)
and migration.app_label == app_config.label
]
if not renames:
return
planned = []
conflicts = []
for old_name, new_name in renames:
old_suffix = f"_{old_name.lower()}"
new_suffix = f"_{new_name.lower()}"
actions, verbose_name_raw = _get_permission_metadata(apps, app_label, new_name)
perms = Permission.objects.using(db).filter(
content_type__app_label=app_label,
codename__in=[f"{action}{old_suffix}" for action in actions],
)
for perm in perms:
for action in actions:
if not perm.codename.startswith(action + "_"):
continue
old_codename = perm.codename
new_codename = f"{action}{new_suffix}"
new_name_str = f"Can {action} {verbose_name_raw}"
planned.append((perm, old_codename, new_codename, new_name_str))
existing = {
p.codename
for p in Permission.objects.using(db).filter(
content_type__app_label=app_label,
codename__in=[new for _, _, new, _ in planned],
)
}
# Look for conflicts
for perm, old, new, _ in planned:
if new in existing and perm.codename != new:
conflicts.append((perm.pk, old, new))
# Raise error if conflicts found
if conflicts:
if verbosity:
style = color_style()
for pk, old, new in conflicts:
msg = (
f"Failed to rename permission {pk} from '{old}' to '{new}'. "
f"Please resolve the conflict manually.\n"
)
stdout.write(style.WARNING(msg))
error_message = f"{len(conflicts)} permission rename conflict(s) detected."
raise RuntimeError(error_message)
with transaction.atomic(using=db):
for perm, _, new_codename, new_name_str in planned:
perm.codename = new_codename
perm.name = new_name_str
perm.save(update_fields={"codename", "name"}, using=db)
for _, from_codename, to_codename, _ in planned:
if verbosity >= 2:
stdout.write(
f"Renamed permission(s): "
f"{app_label}.{from_codename} → {to_codename}\n"
)
def get_system_username():
"""
Return the current system user's username, or an empty string if the
username could not be determined.
"""
try:
result = getpass.getuser()
except (ImportError, KeyError, OSError):
# TODO: Drop ImportError and KeyError when dropping support for PY312.
# KeyError (Python <3.13) or OSError (Python 3.13+) will be raised by
# os.getpwuid() (called by getuser()) if there is no corresponding
# entry in the /etc/passwd file (for example, in a very restricted
# chroot environment).
return ""
return result
def get_default_username(check_db=True, database=DEFAULT_DB_ALIAS):
"""
Try to determine the current system user's username to use as a default.
:param check_db: If ``True``, requires that the username does not match an
existing ``auth.User`` (otherwise returns an empty string).
:param database: The database where the unique check will be performed.
:returns: The username, or an empty string if no username can be
determined or the suggested username is already taken.
"""
# This file is used in apps.py, it should not trigger models import.
from django.contrib.auth import models as auth_app
# If the User model has been swapped out, we can't make any assumptions
# about the default user name.
if auth_app.User._meta.swapped:
return ""
default_username = get_system_username()
try:
default_username = (
unicodedata.normalize("NFKD", default_username)
.encode("ascii", "ignore")
.decode("ascii")
.replace(" ", "")
.lower()
)
except UnicodeDecodeError:
return ""
# Run the username validator
try:
auth_app.User._meta.get_field("username").run_validators(default_username)
except exceptions.ValidationError:
return ""
# Don't return the default username if it is already taken.
if check_db and default_username:
try:
auth_app.User._default_manager.db_manager(database).get(
username=default_username,
)
except auth_app.User.DoesNotExist:
pass
else:
return ""
return default_username
|