From 21e9b655e9e82be8775e9375ce113858dc5791dc Mon Sep 17 00:00:00 2001 From: Franck Cuny Date: Sun, 23 Oct 2022 13:20:03 -0700 Subject: feat(tools/git-broom): CLI to delete local and remote branches This tool helps to keep only the branches that are relevant: the ones that have not been merged yet into the main branch on the principal remote repository. --- tools/default.nix | 1 + tools/git-broom/default.nix | 25 ++++ tools/git-broom/git-broom.py | 350 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 376 insertions(+) create mode 100644 tools/git-broom/default.nix create mode 100755 tools/git-broom/git-broom.py diff --git a/tools/default.nix b/tools/default.nix index 4078f35..543882f 100644 --- a/tools/default.nix +++ b/tools/default.nix @@ -6,5 +6,6 @@ pkgs.lib.makeScope pkgs.newScope (pkgs: { ipconverter = pkgs.callPackage ./ipconverter { }; seqstat = pkgs.callPackage ./seqstat { }; git-blame-stats = pkgs.callPackage ./git-blame-stats { }; + git-broom = pkgs.callPackage ./git-broom { }; sendsms = pkgs.callPackage ./sendsms { }; }) diff --git a/tools/git-broom/default.nix b/tools/git-broom/default.nix new file mode 100644 index 0000000..e25c6ec --- /dev/null +++ b/tools/git-broom/default.nix @@ -0,0 +1,25 @@ +{ self, lib, python3, stdenvNoCC, pkgs }: + +stdenvNoCC.mkDerivation rec { + pname = "git-broom"; + src = ./git-broom.py; + version = "0.1.0"; + + nativeBuildInputs = with pkgs; [ python3 ]; + + dontUnpack = true; + dontBuild = true; + + installPhase = '' + mkdir -p $out/bin + cp $src $out/bin/${pname} + ''; + + + meta = with pkgs.lib; { + description = "CLI to delete local and remote git branches that have been merged."; + license = licenses.mit; + platforms = platforms.unix; + maintainers = [ ]; + }; +} diff --git a/tools/git-broom/git-broom.py b/tools/git-broom/git-broom.py new file mode 100755 index 0000000..8721b3c --- /dev/null +++ b/tools/git-broom/git-broom.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 + +import argparse +import os +import re +import subprocess +import sys +from typing import List, Dict + +import logging + +logging.basicConfig(format="[%(asctime)s]%(levelname)s:%(message)s", level=logging.INFO) + +# regular expression to find the name of the main branch on the remote +re_match_remote_branch = re.compile(r"ref: refs/heads/(?P\S+)\tHEAD") + +# never delete any branches or references with one of these names +immortal_ref = ["main", "master", "HEAD"] + +# that's how my remotes are usually named, and in that order of preference. +preferred_remotes = ["origin", "github", "work"] + + +class GitConfig(object): + """Represent the configuration for the git repository.""" + + def __init__(self) -> None: + self.guess_remote() + self.guess_primary_branch() + self.remote_ref = f"{self.remote_name}/{self.primary_branch}" + self.me = os.getenv("USER") + + def guess_remote(self) -> None: + """Guess the name and URL for the remote repository. + + If the name of the remote is from the list of preferred remote, we + return the name and URL. + + If we don't have a remote set, throw an exception. + If we don't find any remote, throw an exception. + """ + candidates = subprocess.run( + ["git", "config", "--get-regexp", "remote\.[a-z0-9]+.url"], + capture_output=True, + check=True, + encoding="utf-8", + ).stdout.splitlines() + + if len(candidates) == 0: + raise ValueError("No remote is defined.") + + remotes = dict() + + for candidate in candidates: + parts = candidate.split(" ") + remote = parts[0].split(".")[1] + url = parts[1] + remotes[remote] = url + + for remote in preferred_remotes: + if remote in remotes: + self.remote_name = remote + self.remote_url = remotes[remote] + return + + raise ValueError("can't find the preferred remote.") + + def guess_primary_branch(self) -> None: + """Guess the primary branch on the remote. + + If we can't figure out the default branch, thrown an exception. + """ + remote_head = subprocess.run( + ["git", "ls-remote", "--symref", self.remote_name, "HEAD"], + capture_output=True, + check=True, + encoding="utf-8", + ).stdout.splitlines() + + for l in remote_head: + m = re_match_remote_branch.match(l) + if m: + self.primary_branch = m.group("branch") + return + + raise ValueError( + f"can't find the name of the remote branch for {self.remote_name}" + ) + + +def is_git_repository() -> bool: + """Check if we are inside a git repository. + + Return True if we are, false otherwise.""" + res = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], check=False, capture_output=True + ) + return not res.returncode + + +def fetch(remote: str): + """Fetch updates from the remote repository.""" + subprocess.run(["git", "fetch", remote, "--prune"], capture_output=True, check=True) + + +def ref_sha(ref: str) -> str: + """Get the sha from a ref.""" + res = subprocess.run( + ["git", "show-ref", ref], capture_output=True, check=True, encoding="utf-8" + ) + return res.stdout.rstrip() + + +def get_branches(options: List[str]) -> List[str]: + """Get a list of branches.""" + return subprocess.run( + ["git", "branch", "--format", "%(refname:short)"] + options, + capture_output=True, + check=True, + encoding="utf-8", + ).stdout.splitlines() + + +def ref_tree(ref: str) -> str: + """Get the reference from a tree.""" + return subprocess.run( + ["git", "rev-parse", f"{ref}^{{tree}}"], + check=True, + capture_output=True, + encoding="utf-8", + ).stdout.rstrip() + + +def rebase_local_branches(config: GitConfig, local_rebase_tree_id: dict) -> None: + """Try to rebase the local branches that have been not been merged.""" + for branch in get_branches(["--list", "--no-merged"]): + _rebase_local_branch(branch, config, local_rebase_tree_id) + + +def _rebase_local_branch( + branch: str, config: GitConfig, local_rebase_tree_id: dict +) -> None: + res = subprocess.run( + [ + "git", + "merge-base", + "--is-ancestor", + config.remote_ref, + branch, + ], + check=False, + capture_output=True, + ) + if res.returncode == 0: + logging.info( + f"local branch {branch} is already a descendant of {config.remote_ref}." + ) + local_rebase_tree_id[branch] = ref_tree(branch) + return + + logging.info(f"local branch {branch} will be rebased on {config.remote_ref}.") + subprocess.run( + ["git", "checkout", "--force", branch], check=True, capture_output=True + ) + res = subprocess.run( + ["git", "rebase", config.remote_ref], check=True, capture_output=True + ) + if res.returncode == 0: + logging.info(f"local branch {branch} has been rebased") + local_rebase_tree_id[branch] = ref_tree(branch) + else: + logging.error(f"failed to rebase local branch {branch}.") + subprocess.run(["git", "rebase", "--abort"], check=True) + subprocess.run( + ["git", "checkout", "--force", config.primary_branch], check=True + ) + subprocess.run(["git", "reset", "--hard"], check=True) + + +def rebase_remote_branches( + config: GitConfig, local_rebase_tree_id: dict, main_sha: str +) -> None: + for branch in get_branches( + ["--list", "-r", f"{config.me}/*", "--no-merged", config.remote_ref] + ): + _rebase_remote_branches(branch, config, local_rebase_tree_id, main_sha) + + +def _rebase_remote_branches( + branch: str, config: GitConfig, local_rebase_tree_id: dict, main_sha: str +) -> None: + remote, head = branch.split("/") + if head in immortal_ref: + return + + res = subprocess.run( + ["git", "merge-base", "--is-ancestor", config.remote_ref, branch], + check=False, + capture_output=True, + ) + if res.returncode == 0: + logging.info( + f"local branch {branch} is already a descendant of {config.remote_ref}." + ) + return + + logging.info(f"remote branch {branch} will be rebased on {config.remote_ref}.") + + sha = ref_sha(branch) + subprocess.run(["git", "checkout", "--force", sha], capture_output=True, check=True) + res = subprocess.run( + ["git", "rebase", config.remote_ref], + capture_output=True, + check=True, + ) + if res.returncode == 0: + new_sha = ref_sha("--head") + short_sha = new_sha[0:8] + logging.info(f"remote branch {branch} at {sha} rebased to {new_sha}.") + if new_sha == main_sha: + logging.info(f"remote branch {branch}, when rebased, is already merged!") + logging.info(f"would run `git push {remote} :{head}'") + elif new_sha == sha: + logging.info(f"remote branch {branch}, when rebased, is unchanged!") + elif ref_tree(new_sha) == local_rebase_tree_id.get(head, ""): + logging.info(f"remote branch {branch}, when rebased, same as local branch!") + logging.info(f"would run `git push --force-with-lease {remote} {head}'") + else: + logging.info( + f"remote branch {branch} has been rebased to create {short_sha}!" + ) + logging.info( + f"would run `git push --force-with-lease {remote} {new_sha}:{head}'" + ) + else: + logging.error(f"failed to rebase remote branch {branch}.") + subprocess.run(["git", "rebase", "--abort"], check=True) + subprocess.run( + ["git", "checkout", "--force", config.primary_branch], check=True + ) + subprocess.run(["git", "reset", "--hard"], check=True) + + +def destroy_remote_merged_branches(config: GitConfig, dry_run: bool) -> None: + """Destroy remote branches that have been merged.""" + for branch in get_branches( + ["--list", "-r", f"{config.me}/*", "--merged", config.remote_ref] + ): + remote, head = branch.split("/") + if head in immortal_ref: + continue + logging.info(f"remote branch {branch} has been merged") + if dry_run: + logging.info(f"would have run git push {remote} :{head}") + else: + subprocess.run( + ["git", "push", remote, f":{head}"], check=True, encoding="utf-8" + ) + + +def destroy_local_merged_branches(config: GitConfig, dry_run: bool) -> None: + """Destroy local branches that have been merged.""" + for branch in get_branches(["--list", "--merged", config.remote_ref]): + if branch in immortal_ref: + continue + + logging.info(f"local branch {branch} has been merged") + if dry_run: + logging.info(f"would have run git branch --delete --force {branch}") + else: + subprocess.run( + ["git", "branch", "--delete", "--force", branch], + check=True, + encoding="utf-8", + ) + + +def workdir_is_clean() -> bool: + """Check the git workdir is clean.""" + res = subprocess.run( + ["git", "status", "--porcelain"], + check=True, + capture_output=True, + encoding="utf-8", + ).stdout.splitlines() + return not len(res) + + +def main(dry_run: bool) -> bool: + if not is_git_repository(): + logging.error("error: run this inside a git repository") + return False + + if not workdir_is_clean(): + logging.error("the git workdir is not clean, commit or stash your changes.") + return False + + config = GitConfig() + + # what's our current sha ? + origin_main_sha = ref_sha(config.remote_ref) + + # let's get everything up to date + fetch(config.remote_name) + + # let's get the new sha + main_sha = ref_sha(config.remote_ref) + + if origin_main_sha != main_sha: + logging.info(f"we started with {origin_main_sha} and now we have {main_sha}") + + local_rebase_tree_id: Dict[str, str] = dict() + + # try to rebase local branches that have been not been merged + rebase_local_branches(config, local_rebase_tree_id) + + # try to rebase remote branches that have been not been merged + rebase_remote_branches(config, local_rebase_tree_id, main_sha) + + # let's checkout to main now and see what left to do + subprocess.run( + ["git", "checkout", "--force", config.primary_branch], + check=True, + capture_output=True, + ) + + # branches on the remote that have been merged can be destroyed. + destroy_remote_merged_branches(config, dry_run) + + # local branches that have been merged can be destroyed. + destroy_local_merged_branches(config, dry_run) + + # TODO: restore to the branch I was on before ? + return True + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="delete local and remote branches that have been merged." + ) + parser.add_argument( + "--dry-run", + action=argparse.BooleanOptionalAction, + help="when set to True, do not execute the destructive actions", + default=True, + ) + args = parser.parse_args() + + if not main(args.dry_run): + sys.exit(1) -- cgit 1.4.1