diff options
Diffstat (limited to 'scripts/pr_quality/check_pr.py')
| -rw-r--r-- | scripts/pr_quality/check_pr.py | 553 |
1 files changed, 553 insertions, 0 deletions
diff --git a/scripts/pr_quality/check_pr.py b/scripts/pr_quality/check_pr.py new file mode 100644 index 0000000000..054d3e6adf --- /dev/null +++ b/scripts/pr_quality/check_pr.py @@ -0,0 +1,553 @@ +#!/usr/bin/env python3 +""" +PR quality checks for Django pull requests. + +Each check is an independent function that returns None on success, or an +error message on failure. Independent checks are all run so that all issues +are reported in a single pass. Trac status and has_patch checks are skipped +when no ticket is found, since they require a ticket ID to be meaningful. + +Required environment variables: + GITHUB_TOKEN GitHub API token + PR_NUMBER Pull request number + PR_REPO Repository in "owner/repo" format + +Optional environment variables: + AUTOCLOSE Set to "true" to close failing PRs (default: false) + PR_AUTHOR PR author's GitHub login + PR_BODY Pull request body text + PR_CREATED_AT PR creation timestamp (ISO 8601) + PR_TITLE Pull request title +""" + +import json +import logging +import os +import re +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from datetime import date, datetime, timedelta, timezone + +from pr_quality.errors import ( + CHECKS_FOOTER, + CHECKS_HEADER, + INCOMPLETE_CHECKLIST, + INVALID_TRAC_STATUS, + LEVEL_ERROR, + LEVEL_WARNING, + MISSING_AI_DESCRIPTION, + MISSING_AI_DISCLOSURE, + MISSING_DESCRIPTION, + MISSING_HAS_PATCH_FLAG, + MISSING_TICKET_IN_PR_TITLE, + MISSING_TRAC_TICKET, + Message, +) + +GITHUB_PER_PAGE = 100 +LARGE_PR_THRESHOLD = 80 # additions + deletions +MIN_WORDS = 5 +SKIPPED = object() # Sentinel: check was not applicable and was skipped. +TICKET_NOT_FOUND = object() # Sentinel: Trac returned HTTP 404 for the ticket. +URLOPEN_TIMEOUT_SECONDS = 15 +# PRs opened before these dates predate PR template additions. +PR_TEMPLATE_DATE = date(2024, 3, 4) # 3fcef50 -- PR template introduced +AI_DISCLOSURE_DATE = date(2026, 1, 8) # 4f580c4 -- AI disclosure added + +logger = logging.getLogger(__name__) + + +def setup_logging(logger, gha_formatter=True): + logger.setLevel(logging.DEBUG) + + if not logger.handlers and gha_formatter: + + class GHAFormatter(logging.Formatter): + _PREFIXES = { + logging.DEBUG: "::debug::", + logging.INFO: "::notice::", + logging.WARNING: "::warning::", + logging.ERROR: "::error::", + } + + def format(self, record): + msg = super().format(record) + prefix = self._PREFIXES.get(record.levelno, "") + return f"{prefix}{msg}" + + handler = logging.StreamHandler() + handler.setFormatter(GHAFormatter()) + logger.addHandler(handler) + + +def github_request(method, path, token, repo, data=None, params=None): + """Make an authenticated GitHub API request.""" + url = f"https://api.github.com/repos/{repo}{path}" + if params: + encoded_params = urllib.parse.urlencode(params) + url = f"{url}?{encoded_params}" + headers = { + "Authorization": f"Bearer {token}", + "Accept": "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + } + body = None + if data is not None: + body = json.dumps(data).encode() + headers["Content-Type"] = "application/json" + req = urllib.request.Request(url, data=body, headers=headers, method=method) + with urllib.request.urlopen(req, timeout=URLOPEN_TIMEOUT_SECONDS) as response: + return json.loads(response.read()) + + +def get_recent_commit_count(pr_author, repo, token, since_days, max_count): + """Return the number of recent commits by the author, up to max_count.""" + if not pr_author: + return 0 + + since = (datetime.now(timezone.utc) - timedelta(days=since_days)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + results = github_request( + "GET", + "/commits", + token, + repo, + params={"author": pr_author, "since": since, "per_page": max_count}, + ) + return len(results) + + +def get_pr_total_changes(pr_number, repo, token): + """Return total lines changed in the PR (additions + deletions).""" + total_changes = 0 + page = 1 + while True: + results = github_request( + "GET", + f"/pulls/{pr_number}/files", + token, + repo, + params={"per_page": GITHUB_PER_PAGE, "page": page}, + ) + if not results: + break + total_changes += sum(f["changes"] for f in results) + if len(results) < GITHUB_PER_PAGE: + break + page += 1 + return total_changes + + +def strip_html_comments(text): + """Return text with all HTML comments removed.""" + return re.sub(r"<!--.*?-->", "", text, flags=re.DOTALL) + + +def extract_ticket_id(pr_body): + """Return the Trac ticket ID string from the PR body, or None.""" + match = re.search(r"\bticket-(\d+)\b", pr_body, re.IGNORECASE) + return match.group(1) if match else None + + +def fetch_trac_ticket(ticket_id): + """Fetch ticket data from the Trac JSON API. + + Returns a dict with ticket data on success, TICKET_NOT_FOUND if the + ticket does not exist (HTTP 404), or None on a non-fatal network error + (the caller should skip the check). + """ + url = f"https://www.djangoproject.com/trac/api/tickets/{ticket_id}" + try: + with urllib.request.urlopen(url, timeout=URLOPEN_TIMEOUT_SECONDS) as response: + return json.loads(response.read()) + except urllib.error.HTTPError as exc: + code = exc.code + exc.close() + if code == 404: + return TICKET_NOT_FOUND + logger.warning( + "HTTP %s fetching ticket %s -- skipping Trac check.", + code, + ticket_id, + ) + return None + except Exception as exc: + logger.warning( + "Could not fetch ticket %s: %s -- skipping Trac check.", + ticket_id, + exc, + ) + return None + + +def check_trac_ticket(pr_body, total_changes, threshold=LARGE_PR_THRESHOLD): + """A Trac ticket must be referenced in the ticket section. + + For PRs with fewer than threshold lines changed, "N/A" is also accepted + (e.g. for typo fixes). Larger PRs must reference a ticket. + """ + # Look for the ticket reference inside the Trac ticket number section. + section_match = re.search( + r"#### Trac ticket number[^\n]*\n(.*?)(?=\r?\n####|\Z)", + pr_body, + re.DOTALL, + ) + section = section_match.group(1) if section_match else pr_body + + # Strip HTML comments before checking -- the template itself contains "N/A" + # inside a comment, which would otherwise trigger the N/A exemption below. + section = strip_html_comments(section) + + if re.search(r"\bticket-\d+\b", section, re.IGNORECASE): # valid ticket found + return None + + # N/A is accepted for trivial PRs that don't warrant a ticket. + if total_changes < threshold and re.search( + r"(?:^|\s)N/A\b", section, re.IGNORECASE | re.MULTILINE + ): + return None + + return Message(*MISSING_TRAC_TICKET, threshold=threshold) + + +def check_trac_status(ticket_id, ticket_data): + """The referenced Trac ticket must be Accepted, unresolved, and assigned. + + ticket_data is the dict returned by fetch_trac_ticket(). Passing None + skips the check (non-fatal fetch error). Passing TICKET_NOT_FOUND fails + with a generic not-ready message. + """ + if ticket_data is None: + return None # Non-fatal fetch error; skip. + if ticket_data is TICKET_NOT_FOUND: + return Message( + *INVALID_TRAC_STATUS, + ticket_id=ticket_id, + current_state="ticket not found in Trac", + ) + stage = ticket_data.get("custom", {}).get("stage", "").strip() + resolution = (ticket_data.get("resolution") or "").strip() + status = ticket_data.get("status", "").strip() + if stage == "Accepted" and not resolution and status == "assigned": + return None + current_state = [ + f"{stage=}" if stage != "Accepted" else "", + f"{resolution=}" if resolution else "", + f"{status=}" if status != "assigned" else "", + ] + return Message( + *INVALID_TRAC_STATUS, + ticket_id=ticket_id, + current_state=", ".join(s for s in current_state if s), + ) + + +def check_trac_has_patch(ticket_id, initial_data, poll_interval=1, poll_timeout=10): + """The referenced Trac ticket must have has_patch=1. + + Uses initial_data (the dict returned by fetch_trac_ticket()) on the first + check, then polls via fetch_trac_ticket() every poll_interval seconds for + up to poll_timeout seconds. Set poll_timeout=0 for a single check with no + retries. Passing None for initial_data skips the check entirely. + """ + deadline = time.monotonic() + poll_timeout + elapsed = 0 + ticket_data = initial_data + + while True: + logger.info( + "Checking has_patch flag for ticket-%s (elapsed: %ss) ...", + ticket_id, + elapsed, + ) + if ticket_data is None: + return None # Non-fatal fetch error; skip. + if ticket_data is TICKET_NOT_FOUND: + # Ticket not found -- already reported by check_trac_status. + return None + has_patch = ticket_data.get("custom", {}).get("has_patch", "0").strip() + if has_patch == "1": + logger.info("ticket-%s has_patch flag is set.", ticket_id) + return None + remaining = deadline - time.monotonic() + if remaining <= 0: + break + logger.info( + " has_patch not yet set -- will retry in %ss.", + poll_interval, + ) + sleep_time = min(poll_interval, remaining) + time.sleep(sleep_time) + elapsed += int(sleep_time) + ticket_data = fetch_trac_ticket(ticket_id) + + logger.warning( + "ticket-%s has_patch flag was not set after %ss.", + ticket_id, + poll_timeout, + ) + return Message(*MISSING_HAS_PATCH_FLAG, ticket_id=ticket_id) + + +def check_pr_title_has_ticket(pr_title, ticket_id): + """The PR title must include the ticket number (e.g. #36991). + + This enables Trac's auto-link feature, which associates the PR with the + ticket when the title follows the commit message format. + """ + if re.search(rf"#{ticket_id}\b", pr_title): + return None + return Message(*MISSING_TICKET_IN_PR_TITLE, ticket_id=ticket_id) + + +def check_branch_description(pr_body): + """The branch description must be present. + + The description should not contain the placeholder, and should be at least + 5 words long. + """ + placeholder = ( + "Provide a concise overview of the issue or rationale behind the" + " proposed changes." + ) + + description_match = re.search( + r"#### Branch description[ \t]*\r?\n(.*?)(?=\r?\n####|\Z)", + pr_body, + re.DOTALL, + ) + if not description_match: + return Message(*MISSING_DESCRIPTION) + + # Strip HTML comments before evaluating content. + cleaned = strip_html_comments(description_match.group(1)).strip() + + if not cleaned or placeholder in cleaned or len(cleaned.split()) < MIN_WORDS: + return Message(*MISSING_DESCRIPTION) + + return None + + +def check_ai_disclosure(pr_body): + """Exactly one AI disclosure checkbox must be selected. + + If the "AI tools were used" option is checked, at least 5 words of + additional description must be present in that section. + """ + ai_match = re.search( + r"#### AI Assistance Disclosure[^\n]*\n(.*?)(?=\r?\n####|\Z)", + pr_body, + re.DOTALL, + ) + if not ai_match: + return Message(*MISSING_AI_DISCLOSURE) + + section = strip_html_comments(ai_match.group(1)) + no_ai_checked = bool( + re.search(r"-\s*\[x\].*?No AI tools were used", section, re.IGNORECASE) + ) + ai_used_checked = bool( + re.search(r"-\s*\[x\].*?If AI tools were used", section, re.IGNORECASE) + ) + + # Must check exactly one option. + if no_ai_checked == ai_used_checked: + return Message(*MISSING_AI_DISCLOSURE) + + if ai_used_checked: + # Collect non-checkbox lines for word count. + extra_lines = [ + line.strip() + for line in section.splitlines() + if line.strip() and not line.strip().startswith("- [") + ] + # Ensure PR author includes at least 5 words about their AI use. + if len(" ".join(extra_lines).split()) < MIN_WORDS: + return Message(*MISSING_AI_DESCRIPTION) + + return None + + +def check_checklist(pr_body): + """The first five items in the Checklist section must be checked.""" + checklist_match = re.search( + r"#### Checklist[ \t]*\r?\n(.*?)(?=\r?\n####|\Z)", pr_body, re.DOTALL + ) + if not checklist_match: + return Message(*INCOMPLETE_CHECKLIST) + + checkboxes = re.findall(r"-\s*\[(.)\]", checklist_match.group(1)) + + if len(checkboxes) < 5 or not all(c.lower() == "x" for c in checkboxes[:5]): + return Message(*INCOMPLETE_CHECKLIST) + + return None + + +def write_job_summary(pr_number, results, summary_file=None): + """Write a Markdown job summary to the given file path (if provided).""" + if not summary_file: + return + + lines = [ + f"## PR #{pr_number} Quality Check Results\n", + "| | Check | Result |", + "| --- | --- | --- |", + ] + for name, result, level in results: + if result is SKIPPED: + icon, status = "⏭️", "Skipped" + elif result is None: + icon, status = "✅", "Passed" + else: + icon, status = level + lines.append(f"| {icon} | {name} | {status} |") + + with open(summary_file, "a") as f: + f.write("\n".join(lines) + "\n") + + +def main( + repo, + token, + pr_author, + pr_body, + pr_number, + pr_title="", + pr_created_at=None, + autoclose=True, + summary_file=None, + gha_formatter=False, +): + setup_logging(logger, gha_formatter) + + created_date = ( + datetime.fromisoformat(pr_created_at).date() if pr_created_at else None + ) + if created_date is not None and created_date <= PR_TEMPLATE_DATE: + logger.info( + "PR #%s is older than PR template (%s) -- skipping all checks.", + pr_number, + PR_TEMPLATE_DATE, + ) + return + + commit_count = get_recent_commit_count( + pr_author, repo, token, since_days=365 * 3, max_count=5 + ) + if commit_count >= 5: + logger.info( + "PR #%s author is an established contributor -- skipping all checks.", + pr_number, + ) + return + + pr_title_result = SKIPPED + total_changes = get_pr_total_changes(pr_number, repo, token) + ticket_result = check_trac_ticket(pr_body, total_changes) + ticket_status_result = SKIPPED + ticket_has_patch_result = SKIPPED + ticket_id = extract_ticket_id(pr_body) if ticket_result is None else None + if ticket_id: + pr_title_result = check_pr_title_has_ticket(pr_title, ticket_id) + ticket_data = fetch_trac_ticket(ticket_id) + ticket_status_result = check_trac_status(ticket_id, ticket_data) + if ticket_status_result is None: + # Polling is disabled (poll_timeout=0): has_patch is a non-fatal + # warning, and contributors often update Trac after reviewing their + # PR, making any fixed polling window unreliable. + ticket_has_patch_result = check_trac_has_patch( + ticket_id, ticket_data, poll_timeout=0 + ) + else: + logger.info("Trac ticket is not Accepted -- skipping has_patch check.") + else: + logger.info("No Trac ticket -- skipping status and has_patch checks.") + + if created_date is not None and created_date <= AI_DISCLOSURE_DATE: + ai_disclosure_result = SKIPPED + logger.info( + "PR #%s is older than AI Disclosure section (%s) -- skipping AI checks.", + pr_number, + AI_DISCLOSURE_DATE, + ) + else: + ai_disclosure_result = check_ai_disclosure(pr_body) + + results = [ + ("Trac ticket referenced", ticket_result, LEVEL_ERROR), + ("Trac ticket status is Accepted", ticket_status_result, LEVEL_ERROR), + ("Trac ticket has_patch flag set", ticket_has_patch_result, LEVEL_WARNING), + ("PR title includes ticket number", pr_title_result, LEVEL_WARNING), + ("Branch description provided", check_branch_description(pr_body), LEVEL_ERROR), + ("AI disclosure completed", ai_disclosure_result, LEVEL_ERROR), + ("Checklist completed", check_checklist(pr_body), LEVEL_ERROR), + ] + write_job_summary(pr_number, results, summary_file) + + failures = [ + msg.as_details(level=level) + for _, msg, level in results + if msg is not None and msg is not SKIPPED and level == LEVEL_ERROR + ] + warning_msgs = [ + msg.as_details(level=level) + for _, msg, level in results + if msg is not None and msg is not SKIPPED and level == LEVEL_WARNING + ] + if not failures and not warning_msgs: + logger.info("PR #%s passed all quality checks.", pr_number) + return + + github_request( + "POST", + f"/issues/{pr_number}/comments", + token, + repo, + {"body": "\n\n".join([CHECKS_HEADER, *failures, *warning_msgs, CHECKS_FOOTER])}, + ) + if not failures: + logger.warning( + "PR #%s has %s warning(s), adding informational comment.", + pr_number, + len(warning_msgs), + ) + return + + msg = "PR #%s failed %s check(s), adding comment with details." + if not autoclose or commit_count > 0: + logger.warning( + msg + " Not closing the PR given %s.", + pr_number, + len(failures), + "warning-only mode" if not autoclose else "recent contributions", + ) + else: + logger.error( + msg + " Closing the PR given lack of recent contributions.", + pr_number, + len(failures), + ) + github_request("PATCH", f"/pulls/{pr_number}", token, repo, {"state": "closed"}) + return 1 + + +if __name__ == "__main__": + sys.exit( + main( + repo=os.environ["PR_REPO"], + token=os.environ["GITHUB_TOKEN"], + pr_author=os.environ.get("PR_AUTHOR", ""), + pr_body=os.environ.get("PR_BODY", ""), + pr_number=os.environ["PR_NUMBER"], + pr_title=os.environ.get("PR_TITLE", ""), + pr_created_at=os.environ.get("PR_CREATED_AT"), + autoclose=os.environ.get("AUTOCLOSE", "").lower() == "true", + summary_file=os.environ.get("GITHUB_STEP_SUMMARY"), + gha_formatter=os.environ.get("GITHUB_ACTIONS"), + ) + ) |
