#!/usr/bin/env python3 import datetime import hashlib import json import os import click import requests # the ID of the vault in 1password op_vault = "v4mof5qwozyvob2utdk3crwxnu" vault_addr_chi1 = "https://chi1-vault.simulprod.com:8200" creds_cache = os.path.join(os.getenv("HOME") or "", ".local/state/rbxenv") valid_dcs = ["ash1", "chi11"] valid_edges = [ "ams1", "atl1", "dwf1", "fra2", "hkg1", "iad1", "lax1", "lga1", "lhr1", "mia1", "nrt1", "ord1", "ord2", "sjfc1", ] def _path_cached_file(val: str) -> str: """The path to the cache file. The full path is created using the URL to the vault server. """ m = hashlib.sha256() m.update(bytes(val, "utf-8")) val = m.hexdigest() return os.path.join(creds_cache, f"{val}.json") def get_ldap_password() -> str: """Return the LDAP password. The password is expected to be in 1password, under `LDAP'. """ return ( os.popen(f"/usr/local/bin/op read op://{op_vault}/LDAP/password") .read() .rstrip() ) def _get_token_from_cache(addr: str) -> str | None: """Return the token from the cache if it is still valid.""" cached_path = _path_cached_file(addr) if not os.path.isfile(cached_path): return None with open(cached_path) as f: d = json.load(f) expires_after = datetime.datetime.fromtimestamp(int(d["until"])) if datetime.datetime.now() > expires_after: return None return d["token"] return None def _set_token_to_cache(addr: str, token: str, expires_after: datetime.datetime): """Set the token in the cache. The cache also contains the time after which the token won't be valid anymore.""" if not os.path.isdir(creds_cache): os.makedirs(creds_cache) cache = { "until": int(expires_after.timestamp()), "token": token, } cached_path = _path_cached_file(addr) with open(cached_path, "w") as f: json.dump(cache, f) def _vault_login(addr: str) -> str: """Log into vault to get a token. If we get a token, we store it in the cache so we don't need to request it again until it expires. """ ldap_username = os.getenv("USER") ldap_password = get_ldap_password() url = "{}/v1/auth/ldap/login/{}".format(addr, ldap_username) obj = {"method": "push", "password": ldap_password} try: resp = requests.post(url, json=obj) resp.raise_for_status() except Exception as e: print("{} returned {}".format(url, str(e))) expires_after = datetime.datetime.now() + datetime.timedelta( seconds=resp.json()["auth"]["lease_duration"] ) token = resp.json()["auth"]["client_token"] _set_token_to_cache(addr, token, expires_after) return token def get_vault_token(addr: str) -> str: """Get the token for vault.""" token = _get_token_from_cache(addr) if token is None: token = _vault_login(addr) return token def vault_read(path: str, addr: str, token: str, dc: str) -> str: """Read some values from a path in vault.""" url = "{}/v1/{}".format(addr, path) headers = {"x-vault-token": token} try: resp = requests.get(url, headers=headers) resp.raise_for_status() except Exception as e: print("{} returned {}".format(url, str(e))) return resp.json()["data"][dc] @click.command() @click.argument("dc") def setpop(dc: str): """Print some information regarding hashi stack in a POP.""" if dc not in valid_edges: print("nop") token = get_vault_token(vault_addr_chi1) consul_token = vault_read( "secret/teams/neteng/traffic/consul", addr=vault_addr_chi1, token=token, dc=dc ) nomad_token = vault_read( "secret/teams/neteng/traffic/nomad", addr=vault_addr_chi1, token=token, dc=dc ) vault_token = vault_read( "secret/teams/neteng/traffic/vault", addr=vault_addr_chi1, token=token, dc=dc ) print(f"consul token: {consul_token}") print(f"nomad token: {nomad_token}") print(f"vault token: {vault_token}") print(f"https://{dc}-vault.simulprod.com/ui/vault/auth?with=token") @click.group(help="CLI tool to manage environment variables for hashi things at Roblox") def cli(): pass cli.add_command(setpop) if __name__ == "__main__": cli()