""" Creates permissions for all installed apps that need permissions, and renames them on model renames. """ import getpass import sys import unicodedata from django.apps import apps as global_apps from django.contrib.auth import get_permission_codename from django.contrib.contenttypes.management import create_contenttypes from django.core import exceptions from django.core.management.color import color_style from django.db import DEFAULT_DB_ALIAS, migrations, router, transaction def _get_all_permissions(opts): """ Return (codename, name) for all permissions in the given opts. """ return [*_get_builtin_permissions(opts), *opts.permissions] def _get_builtin_permissions(opts): """ Return (codename, name) for all autogenerated permissions. By default, this is ('add', 'change', 'delete', 'view') """ perms = [] for action in opts.default_permissions: perms.append( ( get_permission_codename(action, opts), "Can %s %s" % (action, opts.verbose_name_raw), ) ) return perms def create_permissions( app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, apps=global_apps, **kwargs, ): if not app_config.models_module: return try: Permission = apps.get_model("auth", "Permission") except LookupError: return if not router.allow_migrate_model(using, Permission): return # Ensure that contenttypes are created for this app. Needed if # 'django.contrib.auth' is in INSTALLED_APPS before # 'django.contrib.contenttypes'. create_contenttypes( app_config, verbosity=verbosity, interactive=interactive, using=using, apps=apps, **kwargs, ) app_label = app_config.label try: app_config = apps.get_app_config(app_label) ContentType = apps.get_model("contenttypes", "ContentType") except LookupError: return models = list(app_config.get_models()) # Grab all the ContentTypes. ctypes = ContentType.objects.db_manager(using).get_for_models( *models, for_concrete_models=False ) # Find all the Permissions that have a content_type for a model we're # looking for. We don't need to check for codenames since we already have # a list of the ones we're going to create. all_perms = set( Permission.objects.using(using) .filter( content_type__in=set(ctypes.values()), ) .values_list("content_type", "codename") ) perms = [] for model in models: ctype = ctypes[model] for codename, name in _get_all_permissions(model._meta): if (ctype.pk, codename) not in all_perms: permission = Permission() permission._state.db = using permission.codename = codename permission.name = name permission.content_type = ctype perms.append(permission) Permission.objects.using(using).bulk_create(perms) if verbosity >= 2: for perm in perms: print("Adding permission '%s'" % perm) def _get_permission_metadata(apps, app_label, model_name): try: model = apps.get_model(app_label, model_name) except LookupError: # Model does not exist in this migration state, e.g. zero. Permission = apps.get_model("auth", "Permission") return Permission._meta.default_permissions, model_name return ( model._meta.default_permissions, model._meta.verbose_name_raw, ) def rename_permissions_after_model_rename( app_config, verbosity=2, plan=None, using=DEFAULT_DB_ALIAS, apps=global_apps, stdout=sys.stdout, **kwargs, ): if not app_config.models_module: return # This handler is connected to the global post_migrate signal, which is # emitted for *all* apps — including test configurations where # django.contrib.auth is NOT installed. try: Permission = apps.get_model("auth", "Permission") except LookupError: return if not router.allow_migrate_model(using, Permission): return db = using or router.db_for_write(Permission) app_label = app_config.label # Collect (from_model, to_model) pairs renames = [ (op.new_name, op.old_name) if backward else (op.old_name, op.new_name) for migration, backward in (plan or []) for op in migration.operations if isinstance(op, migrations.RenameModel) and migration.app_label == app_config.label ] if not renames: return planned = [] conflicts = [] for old_name, new_name in renames: old_suffix = f"_{old_name.lower()}" new_suffix = f"_{new_name.lower()}" actions, verbose_name_raw = _get_permission_metadata(apps, app_label, new_name) perms = Permission.objects.using(db).filter( content_type__app_label=app_label, codename__in=[f"{action}{old_suffix}" for action in actions], ) for perm in perms: for action in actions: if not perm.codename.startswith(action + "_"): continue old_codename = perm.codename new_codename = f"{action}{new_suffix}" new_name_str = f"Can {action} {verbose_name_raw}" planned.append((perm, old_codename, new_codename, new_name_str)) existing = { p.codename for p in Permission.objects.using(db).filter( content_type__app_label=app_label, codename__in=[new for _, _, new, _ in planned], ) } # Look for conflicts for perm, old, new, _ in planned: if new in existing and perm.codename != new: conflicts.append((perm.pk, old, new)) # Raise error if conflicts found if conflicts: if verbosity: style = color_style() for pk, old, new in conflicts: msg = ( f"Failed to rename permission {pk} from '{old}' to '{new}'. " f"Please resolve the conflict manually.\n" ) stdout.write(style.WARNING(msg)) error_message = f"{len(conflicts)} permission rename conflict(s) detected." raise RuntimeError(error_message) with transaction.atomic(using=db): for perm, _, new_codename, new_name_str in planned: perm.codename = new_codename perm.name = new_name_str perm.save(update_fields={"codename", "name"}, using=db) for _, from_codename, to_codename, _ in planned: if verbosity >= 2: stdout.write( f"Renamed permission(s): " f"{app_label}.{from_codename} → {to_codename}\n" ) def get_system_username(): """ Return the current system user's username, or an empty string if the username could not be determined. """ try: result = getpass.getuser() except (ImportError, KeyError, OSError): # TODO: Drop ImportError and KeyError when dropping support for PY312. # KeyError (Python <3.13) or OSError (Python 3.13+) will be raised by # os.getpwuid() (called by getuser()) if there is no corresponding # entry in the /etc/passwd file (for example, in a very restricted # chroot environment). return "" return result def get_default_username(check_db=True, database=DEFAULT_DB_ALIAS): """ Try to determine the current system user's username to use as a default. :param check_db: If ``True``, requires that the username does not match an existing ``auth.User`` (otherwise returns an empty string). :param database: The database where the unique check will be performed. :returns: The username, or an empty string if no username can be determined or the suggested username is already taken. """ # This file is used in apps.py, it should not trigger models import. from django.contrib.auth import models as auth_app # If the User model has been swapped out, we can't make any assumptions # about the default user name. if auth_app.User._meta.swapped: return "" default_username = get_system_username() try: default_username = ( unicodedata.normalize("NFKD", default_username) .encode("ascii", "ignore") .decode("ascii") .replace(" ", "") .lower() ) except UnicodeDecodeError: return "" # Run the username validator try: auth_app.User._meta.get_field("username").run_validators(default_username) except exceptions.ValidationError: return "" # Don't return the default username if it is already taken. if check_db and default_username: try: auth_app.User._default_manager.db_manager(database).get( username=default_username, ) except auth_app.User.DoesNotExist: pass else: return "" return default_username