"""
Example of using the OS Login API to apply public SSH keys for a service
account, and use that service account to run commands on a remote
instance over SSH. This example uses zonal DNS names to address instances
on the same internal VPC network.
"""
from __future__ import annotations
import argparse
import subprocess
import time
from typing import Optional
import uuid
from google.cloud import oslogin_v1
import requests
SERVICE_ACCOUNT_METADATA_URL = (
    "http://metadata.google.internal/computeMetadata/v1/instance/"
    "service-accounts/default/email"
)
HEADERS = {"Metadata-Flavor": "Google"}
def execute(
    cmd: list[str],
    cwd: Optional[str] = None,
    capture_output: bool = False,
    env: Optional[dict] = None,
    raise_errors: bool = True,
) -> tuple[int, str]:
    """
    Run an external command (wrapper for Python subprocess).
    Args:
        cmd: The command to be run.
        cwd: Directory in which to run the command.
        capture_output: Should the command output be captured and returned or just ignored.
        env: Environmental variables passed to the child process.
        raise_errors: Should errors in run command raise exceptions.
    Returns:
        Return code and captured output.
    """
    print(f"Running command: {cmd}")
    process = subprocess.run(
        cmd,
        cwd=cwd,
        stdout=subprocess.PIPE if capture_output else subprocess.DEVNULL,
        stderr=subprocess.STDOUT,
        text=True,
        env=env,
        check=raise_errors,
    )
    output = process.stdout
    returncode = process.returncode
    if returncode:
        print(f"Command returned error status {returncode}")
        if capture_output:
            print(f"With output: {output}")
    return returncode, output
def create_ssh_key(
    oslogin_client: oslogin_v1.OsLoginServiceClient,
    account: str,
    expire_time: int = 300,
) -> str:
    """
    Generates a temporary SSH key pair and apply it to the specified account.
    Args:
        oslogin_client: OS Login client object.
        account: Name of the service account this key will be assigned to.
            This should be in form of `user/<service_account_username>`.
        expire_time: How many seconds from now should this key be valid.
    Returns:
        The path to private SSH key. Public key can be found by appending `.pub`
        to the file name.
    """
    private_key_file = f"/tmp/key-{uuid.uuid4()}"
    execute(["ssh-keygen", "-t", "rsa", "-N", "", "-f", private_key_file])
    with open(f"{private_key_file}.pub") as original:
        public_key = original.read().strip()
    # Expiration time is in microseconds.
    expiration = int((time.time() + expire_time) * 1000000)
    request = oslogin_v1.ImportSshPublicKeyRequest()
    request.parent = account
    request.ssh_public_key.key = public_key
    request.ssh_public_key.expiration_time_usec = expiration
    print(f"Setting key for {account}...")
    oslogin_client.import_ssh_public_key(request)
    # Let the key properly propagate
    time.sleep(5)
    return private_key_file
def run_ssh(cmd: str, private_key_file: str, username: str, hostname: str) -> str:
    """
    Runs a command on a remote system.
    Args:
        cmd: command to be run.
        private_key_file: private SSH key to be used for authentication.
        username: username to be used for authentication.
        hostname: hostname of the machine you want to run the command on.
    Returns:
        Output of the executed command.
    """
    ssh_command = [
        "ssh",
        "-i",
        private_key_file,
        "-o",
        "StrictHostKeyChecking=no",
        "-o",
        "UserKnownHostsFile=/dev/null",
        f"{username}@{hostname}",
        cmd,
    ]
    print(f"Running ssh command: {' '.join(ssh_command)}")
    tries = 0
    while tries < 3:
        try:
            ssh = subprocess.run(
                ssh_command,
                shell=False,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                check=True,
                env={"SSH_AUTH_SOCK": ""},
                timeout=10,
            )
        except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as err:
            time.sleep(10)
            tries += 1
            if tries == 3:
                if isinstance(err, subprocess.CalledProcessError):
                    print(
                        f"Failed to run SSH command (return code: {err.returncode}. Output received: {err.output}"
                    )
                else:
                    print("Failed to run SSH - timed out.")
                raise err
        else:
            return ssh.stdout
def main(
    cmd: str,
    project: str,
    instance: Optional[str] = None,
    zone: Optional[str] = None,
    account: Optional[str] = None,
    hostname: Optional[str] = None,
    oslogin: Optional[oslogin_v1.OsLoginServiceClient] = None,
) -> str:
    """
    Runs a command on a remote system.
    Args:
        cmd: command to be executed on the remote host.
        project: name of the project in which te remote instance is hosted.
        instance: name of the remote system instance.
        zone: zone in which the remote system resides. I.e. us-west3-a
        account: account to be used for authentication.
        hostname: hostname of the remote system.
        oslogin: OSLogin service client object. If not provided, a new client will be created.
    Returns:
        The commands output.
    """
    # Create the OS Login API object.
    if oslogin is None:
        oslogin = oslogin_v1.OsLoginServiceClient()
    # Identify the service account ID if it is not already provided.
    account = (
        account or requests.get(SERVICE_ACCOUNT_METADATA_URL, headers=HEADERS).text
    )
    if not account.startswith("users/"):
        account = f"users/{account}"
    # Create a new SSH key pair and associate it with the service account.
    private_key_file = create_ssh_key(oslogin, account)
    try:
        # Using the OS Login API, get the POSIX username from the login profile
        # for the service account.
        profile = oslogin.get_login_profile(name=account)
        username = profile.posix_accounts[0].username
        # Create the hostname of the target instance using the instance name,
        # the zone where the instance is located, and the project that owns the
        # instance.
        hostname = hostname or f"{instance}.{zone}.c.{project}.internal"
        # Run a command on the remote instance over SSH.
        result = run_ssh(cmd, private_key_file, username, hostname)
        # Print the command line output from the remote instance.
        print(result)
        return result
    finally:
        # Shred the private key and delete the pair.
        execute(["shred", private_key_file])
        execute(["rm", private_key_file])
        execute(["rm", f"{private_key_file}.pub"])
if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--cmd", default="uname -a", help="The command to run on the remote instance."
    )
    parser.add_argument("--project", help="Your Google Cloud project ID.")
    parser.add_argument("--zone", help="The zone where the target instance is located.")
    parser.add_argument("--instance", help="The target instance for the ssh command.")
    parser.add_argument("--account", help="The service account email.")
    parser.add_argument(
        "--hostname",
        help="The external IP address or hostname for the target instance.",
    )
    args = parser.parse_args()
    main(
        args.cmd,
        args.project,
        instance=args.instance,
        zone=args.zone,
        account=args.account,
        hostname=args.hostname,
    )