diff options
Diffstat (limited to 'src/cli/hashienv.py')
-rwxr-xr-x | src/cli/hashienv.py | 202 |
1 files changed, 0 insertions, 202 deletions
diff --git a/src/cli/hashienv.py b/src/cli/hashienv.py deleted file mode 100755 index 57708a8..0000000 --- a/src/cli/hashienv.py +++ /dev/null @@ -1,202 +0,0 @@ -#!/usr/bin/env python3 - -import os -import json -import hashlib -import datetime - -import click -import requests - -# the ID of the vault in 1password -op_vault = "v4mof5qwozyvob2utdk3crwxnu" - -vault_addrs = { - "chi1": "https://chi1-vault.simulprod.com:8200", - "ash1": "https://ash1-vault.simulprod.com:8200", -} - -creds_cache = os.path.join(os.getenv("HOME") or "", ".local/state/rbxenv") - - -valid_dcs = ["ash1", "chi1"] -valid_edges = [ - "bom1", - "ams1", - "atl1", - "dwf1", - "fra2", - "hkg1", - "iad1", - "lax1", - "lga1", - "lhr1", - "mia1", - "nrt1", - "ord1", - "ord2", - "sjfc1", -] - - -def _path_cached_file(val: str) -> str: - """The path to the cache file. - The full path is created using the URL to the vault server. - """ - m = hashlib.sha256() - m.update(bytes(val, "utf-8")) - val = m.hexdigest() - return os.path.join(creds_cache, f"{val}.json") - - -def get_ldap_password() -> str: - """Return the LDAP password. - The password is expected to be in 1password, under `LDAP'. - """ - return os.popen(f"/usr/local/bin/op read op://{op_vault}/LDAP/password").read().rstrip() - - -def _get_token_from_cache(addr: str) -> str | None: - """Return the token from the cache if it is still valid.""" - cached_path = _path_cached_file(addr) - if not os.path.isfile(cached_path): - return None - - with open(cached_path) as f: - d = json.load(f) - expires_after = datetime.datetime.fromtimestamp(int(d["until"])) - if datetime.datetime.now() > expires_after: - return None - return d["token"] - - return None - - -def _set_token_to_cache(addr: str, token: str, expires_after: datetime.datetime): - """Set the token in the cache. - The cache also contains the time after which the token won't be valid anymore.""" - if not os.path.isdir(creds_cache): - os.makedirs(creds_cache) - - cache = { - "until": int(expires_after.timestamp()), - "token": token, - } - - cached_path = _path_cached_file(addr) - - with open(cached_path, "w") as f: - json.dump(cache, f) - - -def _vault_login(addr: str) -> str: - """Log into vault to get a token. - If we get a token, we store it in the cache so we don't need to request it again until it expires. - """ - ldap_username = os.getenv("USER") - ldap_password = get_ldap_password() - - url = "{}/v1/auth/ldap/login/{}".format(addr, ldap_username) - obj = {"method": "push", "password": ldap_password} - - try: - resp = requests.post(url, json=obj) - resp.raise_for_status() - except Exception as e: - print("{} returned {}".format(url, str(e))) - - expires_after = datetime.datetime.now() + datetime.timedelta(seconds=resp.json()["auth"]["lease_duration"]) - token = resp.json()["auth"]["client_token"] - - _set_token_to_cache(addr, token, expires_after) - - return token - - -def get_vault_token(addr: str) -> str: - """Get the token for vault.""" - token = _get_token_from_cache(addr) - if token is None: - token = _vault_login(addr) - return token - - -def vault_read(path: str, addr: str, token: str, dc: str) -> str: - """Read some values from a path in vault.""" - url = "{}/v1/{}".format(addr, path) - headers = {"x-vault-token": token} - try: - resp = requests.get(url, headers=headers) - resp.raise_for_status() - except Exception as e: - print("{} returned {}".format(url, str(e))) - - return resp.json()["data"][dc] - - -@click.command() -@click.argument("dc") -def setpop(dc: str): - """Print some information regarding hashi stack in a POP.""" - if dc not in valid_edges: - print(f"{dc} is not a valid edge location") - return - - token = get_vault_token(vault_addrs["chi1"]) - consul_token = vault_read( - "secret/teams/neteng/traffic/consul", - addr=vault_addrs["chi1"], - token=token, - dc=dc, - ) - nomad_token = vault_read( - "secret/teams/neteng/traffic/nomad", - addr=vault_addrs["chi1"], - token=token, - dc=dc, - ) - vault_token = vault_read( - "secret/teams/neteng/traffic/vault", - addr=vault_addrs["chi1"], - token=token, - dc=dc, - ) - - print(f"consul token: {consul_token}") - print(f"nomad token: {nomad_token}") - print(f"vault token: {vault_token}") - print(f"https://{dc}-vault.simulprod.com/ui/vault/auth?with=token") - - -@click.command() -@click.argument("dc") -def setenv(dc: str): - """Print environment variables for the URL and tokens to various components. - This command can be passed to `export` in order to export variables. - For example: - ``` - export (robloxenv setenv chi1) - ``` - """ - if dc not in valid_dcs: - print(f"{dc} is not a valid data center") - return - - if dc not in vault_addrs: - print(f"{dc} doesn't have an associated vault address") - - token = get_vault_token(vault_addrs[dc]) - print(f"VAULT_ADDR={vault_addrs[dc]}") - print(f"VAULT_TOKEN={token}") - - -@click.group(help="CLI tool to manage environment variables for hashi things at Roblox") -def cli(): - pass - - -cli.add_command(setpop) -cli.add_command(setenv) - -if __name__ == "__main__": - cli() |