summaryrefslogtreecommitdiff
path: root/scripts/pr_quality/check_pr.py
diff options
context:
space:
mode:
authorNatalia <124304+nessita@users.noreply.github.com>2026-03-27 22:41:10 -0300
committernessita <124304+nessita@users.noreply.github.com>2026-04-16 12:45:03 -0300
commit4593ae93ae39d0c33ef9edc667fe3425fcf61b28 (patch)
treeb63e75e7e8f20d140aa01bc7b0b3f90cad4b14c2 /scripts/pr_quality/check_pr.py
parent82a2465f71a1f5cbfe3811952c0d07482dea71c6 (diff)
Added automated quality checks for PRs as a GitHub Actions workflow.
This work adds automated PR quality checks as a GitHub Actions workflow to enforce contribution requirements consistently and reduce the manual burden on reviewers for incoming PRs. Thanks to the many reviewers providing meaningful feedback. Co-authored-by: Frank Wiles <frank@revsys.com>
Diffstat (limited to 'scripts/pr_quality/check_pr.py')
-rw-r--r--scripts/pr_quality/check_pr.py553
1 files changed, 553 insertions, 0 deletions
diff --git a/scripts/pr_quality/check_pr.py b/scripts/pr_quality/check_pr.py
new file mode 100644
index 0000000000..054d3e6adf
--- /dev/null
+++ b/scripts/pr_quality/check_pr.py
@@ -0,0 +1,553 @@
+#!/usr/bin/env python3
+"""
+PR quality checks for Django pull requests.
+
+Each check is an independent function that returns None on success, or an
+error message on failure. Independent checks are all run so that all issues
+are reported in a single pass. Trac status and has_patch checks are skipped
+when no ticket is found, since they require a ticket ID to be meaningful.
+
+Required environment variables:
+ GITHUB_TOKEN GitHub API token
+ PR_NUMBER Pull request number
+ PR_REPO Repository in "owner/repo" format
+
+Optional environment variables:
+ AUTOCLOSE Set to "true" to close failing PRs (default: false)
+ PR_AUTHOR PR author's GitHub login
+ PR_BODY Pull request body text
+ PR_CREATED_AT PR creation timestamp (ISO 8601)
+ PR_TITLE Pull request title
+"""
+
+import json
+import logging
+import os
+import re
+import sys
+import time
+import urllib.error
+import urllib.parse
+import urllib.request
+from datetime import date, datetime, timedelta, timezone
+
+from pr_quality.errors import (
+ CHECKS_FOOTER,
+ CHECKS_HEADER,
+ INCOMPLETE_CHECKLIST,
+ INVALID_TRAC_STATUS,
+ LEVEL_ERROR,
+ LEVEL_WARNING,
+ MISSING_AI_DESCRIPTION,
+ MISSING_AI_DISCLOSURE,
+ MISSING_DESCRIPTION,
+ MISSING_HAS_PATCH_FLAG,
+ MISSING_TICKET_IN_PR_TITLE,
+ MISSING_TRAC_TICKET,
+ Message,
+)
+
+GITHUB_PER_PAGE = 100
+LARGE_PR_THRESHOLD = 80 # additions + deletions
+MIN_WORDS = 5
+SKIPPED = object() # Sentinel: check was not applicable and was skipped.
+TICKET_NOT_FOUND = object() # Sentinel: Trac returned HTTP 404 for the ticket.
+URLOPEN_TIMEOUT_SECONDS = 15
+# PRs opened before these dates predate PR template additions.
+PR_TEMPLATE_DATE = date(2024, 3, 4) # 3fcef50 -- PR template introduced
+AI_DISCLOSURE_DATE = date(2026, 1, 8) # 4f580c4 -- AI disclosure added
+
+logger = logging.getLogger(__name__)
+
+
+def setup_logging(logger, gha_formatter=True):
+ logger.setLevel(logging.DEBUG)
+
+ if not logger.handlers and gha_formatter:
+
+ class GHAFormatter(logging.Formatter):
+ _PREFIXES = {
+ logging.DEBUG: "::debug::",
+ logging.INFO: "::notice::",
+ logging.WARNING: "::warning::",
+ logging.ERROR: "::error::",
+ }
+
+ def format(self, record):
+ msg = super().format(record)
+ prefix = self._PREFIXES.get(record.levelno, "")
+ return f"{prefix}{msg}"
+
+ handler = logging.StreamHandler()
+ handler.setFormatter(GHAFormatter())
+ logger.addHandler(handler)
+
+
+def github_request(method, path, token, repo, data=None, params=None):
+ """Make an authenticated GitHub API request."""
+ url = f"https://api.github.com/repos/{repo}{path}"
+ if params:
+ encoded_params = urllib.parse.urlencode(params)
+ url = f"{url}?{encoded_params}"
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Accept": "application/vnd.github+json",
+ "X-GitHub-Api-Version": "2022-11-28",
+ }
+ body = None
+ if data is not None:
+ body = json.dumps(data).encode()
+ headers["Content-Type"] = "application/json"
+ req = urllib.request.Request(url, data=body, headers=headers, method=method)
+ with urllib.request.urlopen(req, timeout=URLOPEN_TIMEOUT_SECONDS) as response:
+ return json.loads(response.read())
+
+
+def get_recent_commit_count(pr_author, repo, token, since_days, max_count):
+ """Return the number of recent commits by the author, up to max_count."""
+ if not pr_author:
+ return 0
+
+ since = (datetime.now(timezone.utc) - timedelta(days=since_days)).strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
+ results = github_request(
+ "GET",
+ "/commits",
+ token,
+ repo,
+ params={"author": pr_author, "since": since, "per_page": max_count},
+ )
+ return len(results)
+
+
+def get_pr_total_changes(pr_number, repo, token):
+ """Return total lines changed in the PR (additions + deletions)."""
+ total_changes = 0
+ page = 1
+ while True:
+ results = github_request(
+ "GET",
+ f"/pulls/{pr_number}/files",
+ token,
+ repo,
+ params={"per_page": GITHUB_PER_PAGE, "page": page},
+ )
+ if not results:
+ break
+ total_changes += sum(f["changes"] for f in results)
+ if len(results) < GITHUB_PER_PAGE:
+ break
+ page += 1
+ return total_changes
+
+
+def strip_html_comments(text):
+ """Return text with all HTML comments removed."""
+ return re.sub(r"<!--.*?-->", "", text, flags=re.DOTALL)
+
+
+def extract_ticket_id(pr_body):
+ """Return the Trac ticket ID string from the PR body, or None."""
+ match = re.search(r"\bticket-(\d+)\b", pr_body, re.IGNORECASE)
+ return match.group(1) if match else None
+
+
+def fetch_trac_ticket(ticket_id):
+ """Fetch ticket data from the Trac JSON API.
+
+ Returns a dict with ticket data on success, TICKET_NOT_FOUND if the
+ ticket does not exist (HTTP 404), or None on a non-fatal network error
+ (the caller should skip the check).
+ """
+ url = f"https://www.djangoproject.com/trac/api/tickets/{ticket_id}"
+ try:
+ with urllib.request.urlopen(url, timeout=URLOPEN_TIMEOUT_SECONDS) as response:
+ return json.loads(response.read())
+ except urllib.error.HTTPError as exc:
+ code = exc.code
+ exc.close()
+ if code == 404:
+ return TICKET_NOT_FOUND
+ logger.warning(
+ "HTTP %s fetching ticket %s -- skipping Trac check.",
+ code,
+ ticket_id,
+ )
+ return None
+ except Exception as exc:
+ logger.warning(
+ "Could not fetch ticket %s: %s -- skipping Trac check.",
+ ticket_id,
+ exc,
+ )
+ return None
+
+
+def check_trac_ticket(pr_body, total_changes, threshold=LARGE_PR_THRESHOLD):
+ """A Trac ticket must be referenced in the ticket section.
+
+ For PRs with fewer than threshold lines changed, "N/A" is also accepted
+ (e.g. for typo fixes). Larger PRs must reference a ticket.
+ """
+ # Look for the ticket reference inside the Trac ticket number section.
+ section_match = re.search(
+ r"#### Trac ticket number[^\n]*\n(.*?)(?=\r?\n####|\Z)",
+ pr_body,
+ re.DOTALL,
+ )
+ section = section_match.group(1) if section_match else pr_body
+
+ # Strip HTML comments before checking -- the template itself contains "N/A"
+ # inside a comment, which would otherwise trigger the N/A exemption below.
+ section = strip_html_comments(section)
+
+ if re.search(r"\bticket-\d+\b", section, re.IGNORECASE): # valid ticket found
+ return None
+
+ # N/A is accepted for trivial PRs that don't warrant a ticket.
+ if total_changes < threshold and re.search(
+ r"(?:^|\s)N/A\b", section, re.IGNORECASE | re.MULTILINE
+ ):
+ return None
+
+ return Message(*MISSING_TRAC_TICKET, threshold=threshold)
+
+
+def check_trac_status(ticket_id, ticket_data):
+ """The referenced Trac ticket must be Accepted, unresolved, and assigned.
+
+ ticket_data is the dict returned by fetch_trac_ticket(). Passing None
+ skips the check (non-fatal fetch error). Passing TICKET_NOT_FOUND fails
+ with a generic not-ready message.
+ """
+ if ticket_data is None:
+ return None # Non-fatal fetch error; skip.
+ if ticket_data is TICKET_NOT_FOUND:
+ return Message(
+ *INVALID_TRAC_STATUS,
+ ticket_id=ticket_id,
+ current_state="ticket not found in Trac",
+ )
+ stage = ticket_data.get("custom", {}).get("stage", "").strip()
+ resolution = (ticket_data.get("resolution") or "").strip()
+ status = ticket_data.get("status", "").strip()
+ if stage == "Accepted" and not resolution and status == "assigned":
+ return None
+ current_state = [
+ f"{stage=}" if stage != "Accepted" else "",
+ f"{resolution=}" if resolution else "",
+ f"{status=}" if status != "assigned" else "",
+ ]
+ return Message(
+ *INVALID_TRAC_STATUS,
+ ticket_id=ticket_id,
+ current_state=", ".join(s for s in current_state if s),
+ )
+
+
+def check_trac_has_patch(ticket_id, initial_data, poll_interval=1, poll_timeout=10):
+ """The referenced Trac ticket must have has_patch=1.
+
+ Uses initial_data (the dict returned by fetch_trac_ticket()) on the first
+ check, then polls via fetch_trac_ticket() every poll_interval seconds for
+ up to poll_timeout seconds. Set poll_timeout=0 for a single check with no
+ retries. Passing None for initial_data skips the check entirely.
+ """
+ deadline = time.monotonic() + poll_timeout
+ elapsed = 0
+ ticket_data = initial_data
+
+ while True:
+ logger.info(
+ "Checking has_patch flag for ticket-%s (elapsed: %ss) ...",
+ ticket_id,
+ elapsed,
+ )
+ if ticket_data is None:
+ return None # Non-fatal fetch error; skip.
+ if ticket_data is TICKET_NOT_FOUND:
+ # Ticket not found -- already reported by check_trac_status.
+ return None
+ has_patch = ticket_data.get("custom", {}).get("has_patch", "0").strip()
+ if has_patch == "1":
+ logger.info("ticket-%s has_patch flag is set.", ticket_id)
+ return None
+ remaining = deadline - time.monotonic()
+ if remaining <= 0:
+ break
+ logger.info(
+ " has_patch not yet set -- will retry in %ss.",
+ poll_interval,
+ )
+ sleep_time = min(poll_interval, remaining)
+ time.sleep(sleep_time)
+ elapsed += int(sleep_time)
+ ticket_data = fetch_trac_ticket(ticket_id)
+
+ logger.warning(
+ "ticket-%s has_patch flag was not set after %ss.",
+ ticket_id,
+ poll_timeout,
+ )
+ return Message(*MISSING_HAS_PATCH_FLAG, ticket_id=ticket_id)
+
+
+def check_pr_title_has_ticket(pr_title, ticket_id):
+ """The PR title must include the ticket number (e.g. #36991).
+
+ This enables Trac's auto-link feature, which associates the PR with the
+ ticket when the title follows the commit message format.
+ """
+ if re.search(rf"#{ticket_id}\b", pr_title):
+ return None
+ return Message(*MISSING_TICKET_IN_PR_TITLE, ticket_id=ticket_id)
+
+
+def check_branch_description(pr_body):
+ """The branch description must be present.
+
+ The description should not contain the placeholder, and should be at least
+ 5 words long.
+ """
+ placeholder = (
+ "Provide a concise overview of the issue or rationale behind the"
+ " proposed changes."
+ )
+
+ description_match = re.search(
+ r"#### Branch description[ \t]*\r?\n(.*?)(?=\r?\n####|\Z)",
+ pr_body,
+ re.DOTALL,
+ )
+ if not description_match:
+ return Message(*MISSING_DESCRIPTION)
+
+ # Strip HTML comments before evaluating content.
+ cleaned = strip_html_comments(description_match.group(1)).strip()
+
+ if not cleaned or placeholder in cleaned or len(cleaned.split()) < MIN_WORDS:
+ return Message(*MISSING_DESCRIPTION)
+
+ return None
+
+
+def check_ai_disclosure(pr_body):
+ """Exactly one AI disclosure checkbox must be selected.
+
+ If the "AI tools were used" option is checked, at least 5 words of
+ additional description must be present in that section.
+ """
+ ai_match = re.search(
+ r"#### AI Assistance Disclosure[^\n]*\n(.*?)(?=\r?\n####|\Z)",
+ pr_body,
+ re.DOTALL,
+ )
+ if not ai_match:
+ return Message(*MISSING_AI_DISCLOSURE)
+
+ section = strip_html_comments(ai_match.group(1))
+ no_ai_checked = bool(
+ re.search(r"-\s*\[x\].*?No AI tools were used", section, re.IGNORECASE)
+ )
+ ai_used_checked = bool(
+ re.search(r"-\s*\[x\].*?If AI tools were used", section, re.IGNORECASE)
+ )
+
+ # Must check exactly one option.
+ if no_ai_checked == ai_used_checked:
+ return Message(*MISSING_AI_DISCLOSURE)
+
+ if ai_used_checked:
+ # Collect non-checkbox lines for word count.
+ extra_lines = [
+ line.strip()
+ for line in section.splitlines()
+ if line.strip() and not line.strip().startswith("- [")
+ ]
+ # Ensure PR author includes at least 5 words about their AI use.
+ if len(" ".join(extra_lines).split()) < MIN_WORDS:
+ return Message(*MISSING_AI_DESCRIPTION)
+
+ return None
+
+
+def check_checklist(pr_body):
+ """The first five items in the Checklist section must be checked."""
+ checklist_match = re.search(
+ r"#### Checklist[ \t]*\r?\n(.*?)(?=\r?\n####|\Z)", pr_body, re.DOTALL
+ )
+ if not checklist_match:
+ return Message(*INCOMPLETE_CHECKLIST)
+
+ checkboxes = re.findall(r"-\s*\[(.)\]", checklist_match.group(1))
+
+ if len(checkboxes) < 5 or not all(c.lower() == "x" for c in checkboxes[:5]):
+ return Message(*INCOMPLETE_CHECKLIST)
+
+ return None
+
+
+def write_job_summary(pr_number, results, summary_file=None):
+ """Write a Markdown job summary to the given file path (if provided)."""
+ if not summary_file:
+ return
+
+ lines = [
+ f"## PR #{pr_number} Quality Check Results\n",
+ "| | Check | Result |",
+ "| --- | --- | --- |",
+ ]
+ for name, result, level in results:
+ if result is SKIPPED:
+ icon, status = "⏭️", "Skipped"
+ elif result is None:
+ icon, status = "✅", "Passed"
+ else:
+ icon, status = level
+ lines.append(f"| {icon} | {name} | {status} |")
+
+ with open(summary_file, "a") as f:
+ f.write("\n".join(lines) + "\n")
+
+
+def main(
+ repo,
+ token,
+ pr_author,
+ pr_body,
+ pr_number,
+ pr_title="",
+ pr_created_at=None,
+ autoclose=True,
+ summary_file=None,
+ gha_formatter=False,
+):
+ setup_logging(logger, gha_formatter)
+
+ created_date = (
+ datetime.fromisoformat(pr_created_at).date() if pr_created_at else None
+ )
+ if created_date is not None and created_date <= PR_TEMPLATE_DATE:
+ logger.info(
+ "PR #%s is older than PR template (%s) -- skipping all checks.",
+ pr_number,
+ PR_TEMPLATE_DATE,
+ )
+ return
+
+ commit_count = get_recent_commit_count(
+ pr_author, repo, token, since_days=365 * 3, max_count=5
+ )
+ if commit_count >= 5:
+ logger.info(
+ "PR #%s author is an established contributor -- skipping all checks.",
+ pr_number,
+ )
+ return
+
+ pr_title_result = SKIPPED
+ total_changes = get_pr_total_changes(pr_number, repo, token)
+ ticket_result = check_trac_ticket(pr_body, total_changes)
+ ticket_status_result = SKIPPED
+ ticket_has_patch_result = SKIPPED
+ ticket_id = extract_ticket_id(pr_body) if ticket_result is None else None
+ if ticket_id:
+ pr_title_result = check_pr_title_has_ticket(pr_title, ticket_id)
+ ticket_data = fetch_trac_ticket(ticket_id)
+ ticket_status_result = check_trac_status(ticket_id, ticket_data)
+ if ticket_status_result is None:
+ # Polling is disabled (poll_timeout=0): has_patch is a non-fatal
+ # warning, and contributors often update Trac after reviewing their
+ # PR, making any fixed polling window unreliable.
+ ticket_has_patch_result = check_trac_has_patch(
+ ticket_id, ticket_data, poll_timeout=0
+ )
+ else:
+ logger.info("Trac ticket is not Accepted -- skipping has_patch check.")
+ else:
+ logger.info("No Trac ticket -- skipping status and has_patch checks.")
+
+ if created_date is not None and created_date <= AI_DISCLOSURE_DATE:
+ ai_disclosure_result = SKIPPED
+ logger.info(
+ "PR #%s is older than AI Disclosure section (%s) -- skipping AI checks.",
+ pr_number,
+ AI_DISCLOSURE_DATE,
+ )
+ else:
+ ai_disclosure_result = check_ai_disclosure(pr_body)
+
+ results = [
+ ("Trac ticket referenced", ticket_result, LEVEL_ERROR),
+ ("Trac ticket status is Accepted", ticket_status_result, LEVEL_ERROR),
+ ("Trac ticket has_patch flag set", ticket_has_patch_result, LEVEL_WARNING),
+ ("PR title includes ticket number", pr_title_result, LEVEL_WARNING),
+ ("Branch description provided", check_branch_description(pr_body), LEVEL_ERROR),
+ ("AI disclosure completed", ai_disclosure_result, LEVEL_ERROR),
+ ("Checklist completed", check_checklist(pr_body), LEVEL_ERROR),
+ ]
+ write_job_summary(pr_number, results, summary_file)
+
+ failures = [
+ msg.as_details(level=level)
+ for _, msg, level in results
+ if msg is not None and msg is not SKIPPED and level == LEVEL_ERROR
+ ]
+ warning_msgs = [
+ msg.as_details(level=level)
+ for _, msg, level in results
+ if msg is not None and msg is not SKIPPED and level == LEVEL_WARNING
+ ]
+ if not failures and not warning_msgs:
+ logger.info("PR #%s passed all quality checks.", pr_number)
+ return
+
+ github_request(
+ "POST",
+ f"/issues/{pr_number}/comments",
+ token,
+ repo,
+ {"body": "\n\n".join([CHECKS_HEADER, *failures, *warning_msgs, CHECKS_FOOTER])},
+ )
+ if not failures:
+ logger.warning(
+ "PR #%s has %s warning(s), adding informational comment.",
+ pr_number,
+ len(warning_msgs),
+ )
+ return
+
+ msg = "PR #%s failed %s check(s), adding comment with details."
+ if not autoclose or commit_count > 0:
+ logger.warning(
+ msg + " Not closing the PR given %s.",
+ pr_number,
+ len(failures),
+ "warning-only mode" if not autoclose else "recent contributions",
+ )
+ else:
+ logger.error(
+ msg + " Closing the PR given lack of recent contributions.",
+ pr_number,
+ len(failures),
+ )
+ github_request("PATCH", f"/pulls/{pr_number}", token, repo, {"state": "closed"})
+ return 1
+
+
+if __name__ == "__main__":
+ sys.exit(
+ main(
+ repo=os.environ["PR_REPO"],
+ token=os.environ["GITHUB_TOKEN"],
+ pr_author=os.environ.get("PR_AUTHOR", ""),
+ pr_body=os.environ.get("PR_BODY", ""),
+ pr_number=os.environ["PR_NUMBER"],
+ pr_title=os.environ.get("PR_TITLE", ""),
+ pr_created_at=os.environ.get("PR_CREATED_AT"),
+ autoclose=os.environ.get("AUTOCLOSE", "").lower() == "true",
+ summary_file=os.environ.get("GITHUB_STEP_SUMMARY"),
+ gha_formatter=os.environ.get("GITHUB_ACTIONS"),
+ )
+ )