about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFranck Cuny <franck@fcuny.net>2022-10-23 13:20:03 -0700
committerFranck Cuny <franck@fcuny.net>2022-10-23 13:29:37 -0700
commit21e9b655e9e82be8775e9375ce113858dc5791dc (patch)
tree533e9390eef5f09133f8c8687c79a129da29c55b
parentref(tools/git-bootstrap): this is replaced by terraform (diff)
downloadworld-21e9b655e9e82be8775e9375ce113858dc5791dc.tar.gz
feat(tools/git-broom): CLI to delete local and remote branches
This tool helps to keep only the branches that are relevant: the ones
that have not been merged yet into the main branch on the principal
remote repository.
-rw-r--r--tools/default.nix1
-rw-r--r--tools/git-broom/default.nix25
-rwxr-xr-xtools/git-broom/git-broom.py350
3 files changed, 376 insertions, 0 deletions
diff --git a/tools/default.nix b/tools/default.nix
index 4078f35..543882f 100644
--- a/tools/default.nix
+++ b/tools/default.nix
@@ -6,5 +6,6 @@ pkgs.lib.makeScope pkgs.newScope (pkgs: {
   ipconverter = pkgs.callPackage ./ipconverter { };
   seqstat = pkgs.callPackage ./seqstat { };
   git-blame-stats = pkgs.callPackage ./git-blame-stats { };
+  git-broom = pkgs.callPackage ./git-broom { };
   sendsms = pkgs.callPackage ./sendsms { };
 })
diff --git a/tools/git-broom/default.nix b/tools/git-broom/default.nix
new file mode 100644
index 0000000..e25c6ec
--- /dev/null
+++ b/tools/git-broom/default.nix
@@ -0,0 +1,25 @@
+{ self, lib, python3, stdenvNoCC, pkgs }:
+
+stdenvNoCC.mkDerivation rec {
+  pname = "git-broom";
+  src = ./git-broom.py;
+  version = "0.1.0";
+
+  nativeBuildInputs = with pkgs; [ python3 ];
+
+  dontUnpack = true;
+  dontBuild = true;
+
+  installPhase = ''
+    mkdir -p $out/bin
+    cp $src $out/bin/${pname}
+  '';
+
+
+  meta = with pkgs.lib; {
+    description = "CLI to delete local and remote git branches that have been merged.";
+    license = licenses.mit;
+    platforms = platforms.unix;
+    maintainers = [ ];
+  };
+}
diff --git a/tools/git-broom/git-broom.py b/tools/git-broom/git-broom.py
new file mode 100755
index 0000000..8721b3c
--- /dev/null
+++ b/tools/git-broom/git-broom.py
@@ -0,0 +1,350 @@
+#!/usr/bin/env python3
+
+import argparse
+import os
+import re
+import subprocess
+import sys
+from typing import List, Dict
+
+import logging
+
+logging.basicConfig(format="[%(asctime)s]%(levelname)s:%(message)s", level=logging.INFO)
+
+# regular expression to find the name of the main branch on the remote
+re_match_remote_branch = re.compile(r"ref: refs/heads/(?P<branch>\S+)\tHEAD")
+
+# never delete any branches or references with one of these names
+immortal_ref = ["main", "master", "HEAD"]
+
+# that's how my remotes are usually named, and in that order of preference.
+preferred_remotes = ["origin", "github", "work"]
+
+
+class GitConfig(object):
+    """Represent the configuration for the git repository."""
+
+    def __init__(self) -> None:
+        self.guess_remote()
+        self.guess_primary_branch()
+        self.remote_ref = f"{self.remote_name}/{self.primary_branch}"
+        self.me = os.getenv("USER")
+
+    def guess_remote(self) -> None:
+        """Guess the name and URL for the remote repository.
+
+        If the name of the remote is from the list of preferred remote, we
+        return the name and URL.
+
+        If we don't have a remote set, throw an exception.
+        If we don't find any remote, throw an exception.
+        """
+        candidates = subprocess.run(
+            ["git", "config", "--get-regexp", "remote\.[a-z0-9]+.url"],
+            capture_output=True,
+            check=True,
+            encoding="utf-8",
+        ).stdout.splitlines()
+
+        if len(candidates) == 0:
+            raise ValueError("No remote is defined.")
+
+        remotes = dict()
+
+        for candidate in candidates:
+            parts = candidate.split(" ")
+            remote = parts[0].split(".")[1]
+            url = parts[1]
+            remotes[remote] = url
+
+        for remote in preferred_remotes:
+            if remote in remotes:
+                self.remote_name = remote
+                self.remote_url = remotes[remote]
+                return
+
+        raise ValueError("can't find the preferred remote.")
+
+    def guess_primary_branch(self) -> None:
+        """Guess the primary branch on the remote.
+
+        If we can't figure out the default branch, thrown an exception.
+        """
+        remote_head = subprocess.run(
+            ["git", "ls-remote", "--symref", self.remote_name, "HEAD"],
+            capture_output=True,
+            check=True,
+            encoding="utf-8",
+        ).stdout.splitlines()
+
+        for l in remote_head:
+            m = re_match_remote_branch.match(l)
+            if m:
+                self.primary_branch = m.group("branch")
+                return
+
+        raise ValueError(
+            f"can't find the name of the remote branch for {self.remote_name}"
+        )
+
+
+def is_git_repository() -> bool:
+    """Check if we are inside a git repository.
+
+    Return True if we are, false otherwise."""
+    res = subprocess.run(
+        ["git", "rev-parse", "--show-toplevel"], check=False, capture_output=True
+    )
+    return not res.returncode
+
+
+def fetch(remote: str):
+    """Fetch updates from the remote repository."""
+    subprocess.run(["git", "fetch", remote, "--prune"], capture_output=True, check=True)
+
+
+def ref_sha(ref: str) -> str:
+    """Get the sha from a ref."""
+    res = subprocess.run(
+        ["git", "show-ref", ref], capture_output=True, check=True, encoding="utf-8"
+    )
+    return res.stdout.rstrip()
+
+
+def get_branches(options: List[str]) -> List[str]:
+    """Get a list of branches."""
+    return subprocess.run(
+        ["git", "branch", "--format", "%(refname:short)"] + options,
+        capture_output=True,
+        check=True,
+        encoding="utf-8",
+    ).stdout.splitlines()
+
+
+def ref_tree(ref: str) -> str:
+    """Get the reference from a tree."""
+    return subprocess.run(
+        ["git", "rev-parse", f"{ref}^{{tree}}"],
+        check=True,
+        capture_output=True,
+        encoding="utf-8",
+    ).stdout.rstrip()
+
+
+def rebase_local_branches(config: GitConfig, local_rebase_tree_id: dict) -> None:
+    """Try to rebase the local branches that have been not been merged."""
+    for branch in get_branches(["--list", "--no-merged"]):
+        _rebase_local_branch(branch, config, local_rebase_tree_id)
+
+
+def _rebase_local_branch(
+    branch: str, config: GitConfig, local_rebase_tree_id: dict
+) -> None:
+    res = subprocess.run(
+        [
+            "git",
+            "merge-base",
+            "--is-ancestor",
+            config.remote_ref,
+            branch,
+        ],
+        check=False,
+        capture_output=True,
+    )
+    if res.returncode == 0:
+        logging.info(
+            f"local branch {branch} is already a descendant of {config.remote_ref}."
+        )
+        local_rebase_tree_id[branch] = ref_tree(branch)
+        return
+
+    logging.info(f"local branch {branch} will be rebased on {config.remote_ref}.")
+    subprocess.run(
+        ["git", "checkout", "--force", branch], check=True, capture_output=True
+    )
+    res = subprocess.run(
+        ["git", "rebase", config.remote_ref], check=True, capture_output=True
+    )
+    if res.returncode == 0:
+        logging.info(f"local branch {branch} has been rebased")
+        local_rebase_tree_id[branch] = ref_tree(branch)
+    else:
+        logging.error(f"failed to rebase local branch {branch}.")
+        subprocess.run(["git", "rebase", "--abort"], check=True)
+        subprocess.run(
+            ["git", "checkout", "--force", config.primary_branch], check=True
+        )
+        subprocess.run(["git", "reset", "--hard"], check=True)
+
+
+def rebase_remote_branches(
+    config: GitConfig, local_rebase_tree_id: dict, main_sha: str
+) -> None:
+    for branch in get_branches(
+        ["--list", "-r", f"{config.me}/*", "--no-merged", config.remote_ref]
+    ):
+        _rebase_remote_branches(branch, config, local_rebase_tree_id, main_sha)
+
+
+def _rebase_remote_branches(
+    branch: str, config: GitConfig, local_rebase_tree_id: dict, main_sha: str
+) -> None:
+    remote, head = branch.split("/")
+    if head in immortal_ref:
+        return
+
+    res = subprocess.run(
+        ["git", "merge-base", "--is-ancestor", config.remote_ref, branch],
+        check=False,
+        capture_output=True,
+    )
+    if res.returncode == 0:
+        logging.info(
+            f"local branch {branch} is already a descendant of {config.remote_ref}."
+        )
+        return
+
+    logging.info(f"remote branch {branch} will be rebased on {config.remote_ref}.")
+
+    sha = ref_sha(branch)
+    subprocess.run(["git", "checkout", "--force", sha], capture_output=True, check=True)
+    res = subprocess.run(
+        ["git", "rebase", config.remote_ref],
+        capture_output=True,
+        check=True,
+    )
+    if res.returncode == 0:
+        new_sha = ref_sha("--head")
+        short_sha = new_sha[0:8]
+        logging.info(f"remote branch {branch} at {sha} rebased to {new_sha}.")
+        if new_sha == main_sha:
+            logging.info(f"remote branch {branch}, when rebased, is already merged!")
+            logging.info(f"would run `git push {remote} :{head}'")
+        elif new_sha == sha:
+            logging.info(f"remote branch {branch}, when rebased, is unchanged!")
+        elif ref_tree(new_sha) == local_rebase_tree_id.get(head, ""):
+            logging.info(f"remote branch {branch}, when rebased, same as local branch!")
+            logging.info(f"would run `git push --force-with-lease {remote} {head}'")
+        else:
+            logging.info(
+                f"remote branch {branch} has been rebased to create {short_sha}!"
+            )
+            logging.info(
+                f"would run `git push --force-with-lease {remote} {new_sha}:{head}'"
+            )
+    else:
+        logging.error(f"failed to rebase remote branch {branch}.")
+        subprocess.run(["git", "rebase", "--abort"], check=True)
+        subprocess.run(
+            ["git", "checkout", "--force", config.primary_branch], check=True
+        )
+        subprocess.run(["git", "reset", "--hard"], check=True)
+
+
+def destroy_remote_merged_branches(config: GitConfig, dry_run: bool) -> None:
+    """Destroy remote branches that have been merged."""
+    for branch in get_branches(
+        ["--list", "-r", f"{config.me}/*", "--merged", config.remote_ref]
+    ):
+        remote, head = branch.split("/")
+        if head in immortal_ref:
+            continue
+        logging.info(f"remote branch {branch} has been merged")
+        if dry_run:
+            logging.info(f"would have run git push {remote} :{head}")
+        else:
+            subprocess.run(
+                ["git", "push", remote, f":{head}"], check=True, encoding="utf-8"
+            )
+
+
+def destroy_local_merged_branches(config: GitConfig, dry_run: bool) -> None:
+    """Destroy local branches that have been merged."""
+    for branch in get_branches(["--list", "--merged", config.remote_ref]):
+        if branch in immortal_ref:
+            continue
+
+        logging.info(f"local branch {branch} has been merged")
+        if dry_run:
+            logging.info(f"would have run git branch --delete --force {branch}")
+        else:
+            subprocess.run(
+                ["git", "branch", "--delete", "--force", branch],
+                check=True,
+                encoding="utf-8",
+            )
+
+
+def workdir_is_clean() -> bool:
+    """Check the git workdir is clean."""
+    res = subprocess.run(
+        ["git", "status", "--porcelain"],
+        check=True,
+        capture_output=True,
+        encoding="utf-8",
+    ).stdout.splitlines()
+    return not len(res)
+
+
+def main(dry_run: bool) -> bool:
+    if not is_git_repository():
+        logging.error("error: run this inside a git repository")
+        return False
+
+    if not workdir_is_clean():
+        logging.error("the git workdir is not clean, commit or stash your changes.")
+        return False
+
+    config = GitConfig()
+
+    # what's our current sha ?
+    origin_main_sha = ref_sha(config.remote_ref)
+
+    # let's get everything up to date
+    fetch(config.remote_name)
+
+    # let's get the new sha
+    main_sha = ref_sha(config.remote_ref)
+
+    if origin_main_sha != main_sha:
+        logging.info(f"we started with {origin_main_sha} and now we have {main_sha}")
+
+    local_rebase_tree_id: Dict[str, str] = dict()
+
+    # try to rebase local branches that have been not been merged
+    rebase_local_branches(config, local_rebase_tree_id)
+
+    # try to rebase remote branches that have been not been merged
+    rebase_remote_branches(config, local_rebase_tree_id, main_sha)
+
+    # let's checkout to main now and see what left to do
+    subprocess.run(
+        ["git", "checkout", "--force", config.primary_branch],
+        check=True,
+        capture_output=True,
+    )
+
+    # branches on the remote that have been merged can be destroyed.
+    destroy_remote_merged_branches(config, dry_run)
+
+    # local branches that have been merged can be destroyed.
+    destroy_local_merged_branches(config, dry_run)
+
+    # TODO: restore to the branch I was on before ?
+    return True
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(
+        description="delete local and remote branches that have been merged."
+    )
+    parser.add_argument(
+        "--dry-run",
+        action=argparse.BooleanOptionalAction,
+        help="when set to True, do not execute the destructive actions",
+        default=True,
+    )
+    args = parser.parse_args()
+
+    if not main(args.dry_run):
+        sys.exit(1)