Statistiques de PR fusionnées par auteur et par semaine

Ce script récupère, via l’API GitHub, le nombre de pull requests (PR) fusionnées pour un ou plusieurs dépôts, les regroupe par auteur et par semaine sur l’année écoulée, puis enregistre les graphiques sous forme d’images PNG.

Les données récupérées sont mises en cache localement (un fichier CSV par dépôt). Lors des exécutions suivantes, seules les PR plus récentes que la dernière date mise en cache sont requêtées.

Dépendances : requests, pandas, matplotlib.

Usage :

export GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxx  # optionnel mais recommandé
python github_stat_pr.py

Images générées :

  • github_stat_pr_bar.png — diagramme empilé (toutes repos confondues)

  • github_stat_pr_heatmap.png — heatmap (toutes repos confondues)

  • github_stat_pr_lines.png — graphe en lignes comparant les dépôts

# coding: utf-8
"""
Script : statistiques de PR fusionnées par auteur et par semaine
================================================================

Ce script récupère, via l'API GitHub, le nombre de *pull requests* (PR) fusionnées
pour **un ou plusieurs dépôts**, les regroupe par auteur et par semaine sur l'année
écoulée, puis enregistre les graphiques sous forme d'images PNG.

Les données récupérées sont **mises en cache** localement (un fichier CSV par dépôt).
Lors des exécutions suivantes, seules les PR plus récentes que la dernière date mise
en cache sont requêtées, ce qui réduit le nombre d'appels à l'API.

**Dépendances :** ``requests``, ``pandas``, ``matplotlib``.

**Token GitHub :** définissez la variable d'environnement ``GITHUB_TOKEN`` avec un
*Personal Access Token* (PAT) GitHub pour dépasser la limite de 60 requêtes/heure :

.. code-block:: bash

    export GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxx

**Usage :**

.. code-block:: bash

    python github_stat_pr.py

Les images sont enregistrées dans le répertoire courant :

* ``github_stat_pr_bar.png`` — diagramme empilé (toutes repos confondues)
* ``github_stat_pr_heatmap.png`` — heatmap (toutes repos confondues)
* ``github_stat_pr_lines.png`` — graphe en lignes comparant les dépôts
* ``github_stat_pr_bar_{owner}_{repo}.png`` — diagramme empilé par dépôt
* ``github_stat_pr_heatmap_{owner}_{repo}.png`` — heatmap par dépôt
"""

import datetime
import os
import pathlib

import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd
import requests

# ---------------------------------------------------------------------------
# Paramètres
# ---------------------------------------------------------------------------

REPOS = [
    # ("sdpython", "teachpyx"),
    # ("sdpython", "teachcompute"),
    # ("sdpython", "onnx-extended"),
    ("sdpython", "onnx-diagnostic"),
    ("sdpython", "experimental-experiment"),
    ("xadupre", "yet-another-onnx-builder"),
    ("xadupre", "mbext"),
    ("onnx", "sklearn-onnx"),
    ("onnx", "onnxmltools"),
    # ("sdpython", "onnx-extended"),  # ajoutez d'autres dépôts ici
]

# Répertoire de cache (créé automatiquement si nécessaire)
CACHE_DIR = pathlib.Path(".")

# Liste blanche d'auteurs : seuls ces auteurs seront inclus dans l'analyse.
# Laissez vide ([]) pour inclure tous les auteurs.
AUTHOR_WHITELIST: list[str] = ["xadupre", "sdpython", "Copilot", "dependabot[bot]"]

# Répertoire de sortie pour les images PNG
OUTPUT_DIR = pathlib.Path(".")

# Jeton d'authentification GitHub (optionnel mais recommandé)
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "")

# ---------------------------------------------------------------------------
# Cache
# ---------------------------------------------------------------------------

CACHE_DATE_FMT = "%Y-%m-%dT%H:%M:%S%z"


def _cache_path(cache_dir: pathlib.Path, owner: str, repo: str) -> pathlib.Path:
    safe = f"{owner}_{repo}".replace("/", "_")
    return cache_dir / f"prs_cache_{safe}.csv"


def load_cache(cache_dir: pathlib.Path, owner: str, repo: str) -> pd.DataFrame:
    path = _cache_path(cache_dir, owner, repo)
    if not path.exists():
        return pd.DataFrame(columns=["author", "merged_at", "repo"])
    df = pd.read_csv(path, parse_dates=["merged_at"])
    if df["merged_at"].dt.tz is None:
        df["merged_at"] = df["merged_at"].dt.tz_localize("UTC")
    else:
        df["merged_at"] = df["merged_at"].dt.tz_convert("UTC")
    return df


def save_cache(
    cache_dir: pathlib.Path, owner: str, repo: str, df: pd.DataFrame
) -> None:
    cache_dir.mkdir(parents=True, exist_ok=True)
    path = _cache_path(cache_dir, owner, repo)
    df.to_csv(path, index=False, date_format=CACHE_DATE_FMT)


# ---------------------------------------------------------------------------
# Récupération des PR via l'API GitHub
# ---------------------------------------------------------------------------


def fetch_merged_prs(
    owner: str,
    repo: str,
    token: str = "",
    fetch_since: datetime.datetime | None = None,
) -> list[dict]:
    """Récupère les PR fusionnées pour un dépôt à partir d'une date donnée.

    :param owner: propriétaire du dépôt GitHub
    :param repo: nom du dépôt GitHub
    :param token: jeton d'authentification GitHub (optionnel)
    :param fetch_since: date de départ de la recherche ; si ``None``, remonte
        jusqu'à 365 jours en arrière.
    :return: liste de dictionnaires ``{author, merged_at, repo}``
    """
    headers = {"Accept": "application/vnd.github+json"}
    if token:
        headers["Authorization"] = f"Bearer {token}"

    cutoff = (
        fetch_since
        if fetch_since is not None
        else (
            datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=365)
        )
    )

    results = []
    page = 1
    per_page = 100

    while True:
        url = (
            f"https://api.github.com/repos/{owner}/{repo}/pulls"
            f"?state=closed&per_page={per_page}&page={page}&sort=updated&direction=desc"
        )
        response = requests.get(url, headers=headers, timeout=30)
        try:
            response.raise_for_status()
        except requests.HTTPError as exc:
            status = exc.response.status_code
            if status == 401:
                raise RuntimeError(
                    "Authentification refusée (401). Vérifiez votre GITHUB_TOKEN."
                ) from exc
            if status == 403:
                print(
                    "Accès refusé (403). Vous avez peut-être atteint la limite de "
                    "l'API GitHub (60 requêtes/h sans token). Définissez GITHUB_TOKEN."
                )
                break
            if status == 404:
                raise RuntimeError(
                    f"Dépôt introuvable (404) : {owner}/{repo}. "
                    "Vérifiez OWNER et REPO."
                ) from exc
            raise
        prs = response.json()

        if not prs:
            break

        stop = False
        for pr in prs:
            merged_at = pr.get("merged_at")
            if not merged_at:
                continue
            merged_dt = datetime.datetime.fromisoformat(
                merged_at.replace("Z", "+00:00")
            )
            if merged_dt <= cutoff:
                stop = True
                break
            author = (pr.get("user") or {}).get("login", "unknown")
            results.append(
                {"author": author, "merged_at": merged_dt, "repo": f"{owner}/{repo}"}
            )

        if stop:
            break

        page += 1

    return results


def load_prs_with_cache(
    owner: str,
    repo: str,
    token: str = "",
    cache_dir: pathlib.Path = pathlib.Path("."),
) -> pd.DataFrame:
    """Charge les PR fusionnées en utilisant le cache local.

    :return: DataFrame avec les colonnes ``author``, ``merged_at``, ``repo``
    """
    now = datetime.datetime.now(datetime.timezone.utc)
    cutoff_365 = now - datetime.timedelta(days=365)

    cached_df = load_cache(cache_dir, owner, repo)

    if cached_df.empty:
        fetch_since = None
        print(f"  {owner}/{repo} : cache vide, récupération complète…")
    else:
        latest = cached_df["merged_at"].max()
        fetch_since = latest.replace(hour=0, minute=0, second=0, microsecond=0)
        print(
            f"  {owner}/{repo} : cache chargé ({len(cached_df)} entrées), "
            f"récupération des PR depuis {fetch_since.date()}…"
        )

    new_prs = fetch_merged_prs(owner, repo, token, fetch_since=fetch_since)
    print(f"    → {len(new_prs)} nouvelle(s) PR(s) récupérée(s) via l'API.")

    if new_prs:
        new_df = pd.DataFrame(new_prs)
        if cached_df.shape[0]:
            combined = pd.concat([cached_df, new_df], ignore_index=True)
        else:
            combined = new_df
    else:
        combined = cached_df.copy()

    combined.drop_duplicates(subset=["repo", "author", "merged_at"], inplace=True)
    combined = combined[combined["merged_at"] >= cutoff_365].copy()
    combined.sort_values("merged_at", inplace=True)
    combined.reset_index(drop=True, inplace=True)

    save_cache(cache_dir, owner, repo, combined)
    print(f"    → cache mis à jour ({len(combined)} entrées au total).")

    return combined


# ---------------------------------------------------------------------------
# Agrégation
# ---------------------------------------------------------------------------


def aggregate_weekly(df: pd.DataFrame) -> pd.DataFrame:
    """Regroupe les PR par (repo, author, week) et retourne un DataFrame long."""
    df = df.copy()
    df["week"] = df["merged_at"].dt.to_period("W").dt.start_time
    return df.groupby(["repo", "author", "week"]).size().reset_index(name="pr_count")


def make_pivot(weekly: pd.DataFrame) -> pd.DataFrame:
    """Construit le tableau croisé auteur x semaine trié par total décroissant."""
    pivot = weekly.pivot_table(
        index="author", columns="week", values="pr_count", aggfunc="sum", fill_value=0
    )
    return pivot.loc[pivot.sum(axis=1).sort_values(ascending=False).index]


# ---------------------------------------------------------------------------
# Visualisation
# ---------------------------------------------------------------------------


def plot_bar(pivot: pd.DataFrame, title: str, output_path: pathlib.Path) -> None:
    """Diagramme à barres empilées (auteur x semaine) enregistré en PNG."""
    fig, ax = plt.subplots(figsize=(14, 5))
    stacked_height = None
    week_nums = mdates.date2num(pivot.columns.to_pydatetime())

    for author in pivot.index:
        values = pivot.loc[author].values
        if stacked_height is None:
            ax.bar(week_nums, values, width=5, label=author)
            stacked_height = values.copy()
        else:
            ax.bar(week_nums, values, width=5, bottom=stacked_height, label=author)
            stacked_height += values

    ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
    ax.xaxis.set_major_locator(mdates.WeekdayLocator(byweekday=mdates.MO, interval=4))
    plt.xticks(rotation=45, ha="right")
    ax.set_xlabel("Week")
    ax.set_ylabel("PR merged per week")
    ax.set_title(title)
    ax.yaxis.set_major_locator(ticker.MultipleLocator(50))
    ax.yaxis.set_minor_locator(ticker.MultipleLocator(10))
    ax.grid(which="major")
    ax.grid(which="minor")
    ax.legend(loc="upper left", bbox_to_anchor=(1, 1), title="Auteur")
    plt.tight_layout()
    plt.savefig(output_path, dpi=150)
    plt.close(fig)
    print(f"  → {output_path}")


def plot_heatmap(pivot: pd.DataFrame, title: str, output_path: pathlib.Path) -> None:
    """Heatmap auteur x semaine enregistrée en PNG."""
    fig, ax = plt.subplots(figsize=(14, max(3, len(pivot) * 0.5)))
    im = ax.imshow(pivot.values, aspect="auto", cmap="YlOrRd")
    plt.colorbar(im, ax=ax, label="Nombre de PR")

    ax.set_yticks(range(len(pivot.index)))
    ax.set_yticklabels(pivot.index)

    step = max(1, len(pivot.columns) // 12)
    ax.set_xticks(range(0, len(pivot.columns), step))
    ax.set_xticklabels(
        [str(d)[:10] for d in pivot.columns[::step]], rotation=45, ha="right"
    )

    ax.set_title(title)
    ax.set_xlabel("Semaine")
    ax.set_ylabel("Auteur")
    ax.yaxis.set_major_locator(ticker.MultipleLocator(50))
    ax.yaxis.set_minor_locator(ticker.MultipleLocator(10))
    ax.grid(which="major")
    ax.grid(which="minor")
    plt.tight_layout()
    plt.savefig(output_path, dpi=150)
    plt.close(fig)
    print(f"  → {output_path}")


def plot_lines_by_repo(
    weekly: pd.DataFrame, title: str, output_path: pathlib.Path
) -> None:
    """Graphe en lignes : total de PR fusionnées par semaine pour chaque dépôt.

    Chaque dépôt est représenté par une ligne, ce qui permet de comparer
    visuellement l'activité entre dépôts.
    """
    repo_weekly = weekly.groupby(["repo", "week"])["pr_count"].sum().reset_index()
    all_weeks = sorted(repo_weekly["week"].unique())

    fig, ax = plt.subplots(figsize=(14, 5))
    for repo_name, grp in repo_weekly.groupby("repo"):
        grp_indexed = grp.set_index("week").reindex(all_weeks, fill_value=0)
        week_nums = mdates.date2num(pd.to_datetime(grp_indexed.index).to_pydatetime())
        ax.plot(
            week_nums,
            grp_indexed["pr_count"].values,
            marker="o",
            markersize=3,
            label=repo_name,
        )

    ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
    ax.xaxis.set_major_locator(mdates.WeekdayLocator(byweekday=mdates.MO, interval=4))
    plt.xticks(rotation=45, ha="right")
    ax.set_xlabel("Week")
    ax.set_ylabel("PR merged per week")
    ax.set_title(title)
    ax.legend(loc="upper left", bbox_to_anchor=(1, 1), title="Dépôt")
    ax.yaxis.set_major_locator(ticker.MultipleLocator(50))
    ax.yaxis.set_minor_locator(ticker.MultipleLocator(10))
    ax.grid(which="major")
    ax.grid(which="minor")
    plt.tight_layout()
    plt.savefig(output_path, dpi=150)
    plt.close(fig)
    print(f"  → {output_path}")


# ---------------------------------------------------------------------------
# Point d'entrée
# ---------------------------------------------------------------------------


def main() -> None:
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    # 1. Récupération des données
    print("Récupération des PR fusionnées…")
    frames = []
    for owner, repo in REPOS:
        frames.append(load_prs_with_cache(owner, repo, GITHUB_TOKEN, CACHE_DIR))
    if not frames:
        print("Aucune donnée.")
        return

    merged_prs = pd.concat(frames, ignore_index=True)
    print(f"\nTotal : {len(merged_prs)} PR(s) fusionnée(s).")

    # 2. Filtre par liste blanche
    df = merged_prs.copy()
    if AUTHOR_WHITELIST:
        df = df[df["author"].isin(AUTHOR_WHITELIST)].copy()
        if df.empty:
            print("Aucun auteur de la liste blanche trouvé dans les données.")
            return

    # 3. Agrégation
    weekly = aggregate_weekly(df)
    pivot_all = make_pivot(weekly)

    # Agrégation non filtrée pour le graphe de comparaison entre dépôts
    weekly_all = aggregate_weekly(merged_prs)

    # 4. Graphiques combinés (toutes repos)
    print("\nGénération des graphiques combinés…")
    plot_bar(pivot_all, "PR merged per week", OUTPUT_DIR / "github_stat_pr_bar.png")
    plot_heatmap(
        pivot_all, "Heatmap of merged PR", OUTPUT_DIR / "github_stat_pr_heatmap.png"
    )

    # 4b. Graphe en lignes : une ligne par dépôt, auteurs agrégés (données non filtrées)
    if len(REPOS) > 1:
        plot_lines_by_repo(
            weekly_all,
            "PR merged per week / repositories",
            OUTPUT_DIR / "github_stat_pr_lines.png",
        )

    print("\nTerminé.")


if __name__ == "__main__":
    main()