Clientseitige Verschlüsselung

Auf dieser Seite wird beschrieben, wie die clientseitige Verschlüsselung in Cloud SQL implementiert wird.

Übersicht

Bei der clientseitigen Verschlüsselung werden Daten vor dem Schreiben in Cloud SQL verschlüsselt. Sie können Cloud SQL-Daten so verschlüsseln, dass nur Ihre Anwendung sie entschlüsseln kann.

Zum Aktivieren der clientseitigen Verschlüsselung haben Sie folgende Möglichkeiten:

  1. Mit einem im Cloud Key Management Service (Cloud KMS) gespeicherten Verschlüsselungsschlüssel.
  2. Mit einem lokal in Ihrer Anwendung gespeicherten Verschlüsselungsschlüssel.

In diesem Thema wird beschrieben, wie Sie die erste Option verwenden, die die einfachste Möglichkeit zur Schlüsselverwaltung bietet. Wir erstellen einen Verschlüsselungsschlüssel in Cloud KMS und implementieren die Umschlagverschlüsselung mithilfe von Tink, der Open-Source-Kryptografiebibliothek von Google.

Warum ist eine clientseitige Verschlüsselung erforderlich?

Sie benötigen clientseitige Verschlüsselung, wenn Sie die Cloud SQL-Daten auf Spaltenebene 1 schützen möchten. Angenommen, Sie haben eine Tabelle mit Namen und Kreditkartennummern. Sie möchten einem Nutzer Zugriff auf diese Tabelle gewähren, er soll jedoch nicht die Kreditkartennummern sehen. Sie können die Nummern mit der clientseitigen Verschlüsselung verschlüsseln. Solange dem Nutzer kein Zugriff auf den Verschlüsselungsschlüssel in Cloud KMS gewährt wurde, kann er die Kreditkartendaten nicht lesen.

Schlüssel mit Cloud KMS erstellen

Mit Cloud KMS können Sie Schlüssel auf der Google Cloud Platform erstellen und verwalten.

Cloud KMS unterstützt viele verschiedene Schlüsseltypen. Für die clientseitige Verschlüsselung müssen Sie einen symmetrischen Schlüssel erstellen.

Um Ihrer Anwendung Zugriff auf den Schlüssel in Cloud KMS zu gewähren, müssen Sie dem Dienstkonto, das Ihre Anwendung verwendet, die Rolle cloudkms.cryptoKeyEncrypterDecrypter zuweisen. In gcloud verwenden Sie den folgenden Befehl:

gcloud kms keys add-iam-policy-binding key \
--keyring=key-ring \
--location=location \
--member=serviceAccount:service-account-name@example.domain.com \
--role=roles/cloudkms.cryptoKeyEncrypterDecrypter

Sie können den KMS-Schlüssel zwar verwenden, um Daten direkt zu verschlüsseln, hier verwenden wir aber eine flexiblere Lösung namens Umschlagverschlüsselung. Auf diese Weise können Nachrichten, die länger als 64 KB sind, verschlüsselt werden. Dies ist die maximale Nachrichtengröße, die von der Cloud Key Management Service API unterstützt wird.

Cloud KMS-Umschlagverschlüsselung

Bei der Umschlagverschlüsselung fungiert der KMS-Schlüssel als KEK. Das heißt, er wird zum Verschlüsseln von Datenverschlüsselungsschlüsseln (Data Encryption Keys, DEKs) verwendet, die wiederum zum Verschlüsseln tatsächlicher Daten verwendet werden.

Nachdem Sie einen KEK in Cloud KMS erstellt haben, müssen Sie zum Verschlüsseln jeder Nachricht Folgendes tun:

  • Generieren Sie lokal einen Datenverschlüsselungsschlüssel (Data Encryption Key, DEK).
  • Verwenden Sie diesen DEK lokal zum Verschlüsseln der Nachricht.
  • Rufen Sie Cloud KMS auf, um den DEK mit dem KEK zu verschlüsseln (zusammenzufassen).
  • Speichern Sie die verschlüsselten Daten und den zusammengefassten DEK.

In diesem Thema verwenden wir Tink, statt eine völlig neue Umschlag-Verschlüsselung zu implementieren.

Tink

Tink ist eine plattformübergreifende mehrsprachige Bibliothek, die hochwertige kryptografische APIs bereitstellt. Um Daten mit der Umschlagverschlüsselung von Tink zu verschlüsseln, stellen Sie Tink einen Schlüssel-URI bereit, der auf Ihren KEK in Cloud KMS verweist, und Anmeldedaten, mit denen Tink den KEK verwenden kann. Tink generiert den DEK, verschlüsselt die Daten, verpackt den DEK und gibt einen einzelnen Geheimtext mit den verschlüsselten Daten und dem verpackten DEK zurück.

Tink unterstützt die Umschlagverschlüsselung in C++, Java, Go und Python mithilfe der AEAD API:

public interface Aead{
  byte[] encrypt(final byte[] plaintext, final byte[] associatedData)
  throws
  byte[] decrypt(final byte[] ciphertext, final byte[] associatedData)
  throws
}

Neben dem normalen Argument „message/ciphertext” unterstützen die Methoden zum Verschlüsseln und Entschlüsseln optional verknüpfte Daten. Dieses Argument kann verwendet werden, um den Geheimtext mit einem Datenelement zu verknüpfen. Angenommen, Sie haben eine Datenbank mit dem Feld user-id und dem Feld encrypted-medical-history. In diesem Fall sollte das Feld user-id beim Verschlüsseln des medizinischen Verlaufs als verknüpfte Daten verwendet werden. Dadurch wird verhindert, dass ein Angreifer einen medizinischen Verlauf von einem Nutzer zu einem anderen verschieben kann. Außerdem wird bei der Ausführung einer Abfrage überprüft, ob Sie die richtige Datenzeile haben.

Beispiele

In diesem Abschnitt wird anhand eines Beispielcodes eine Wählerdatenbank mit clientseitiger Verschlüsselung verwendet. Der Beispielcode zeigt, wie Pub/Sub-Ereignisse in einem Dienst gelesen werden, der in Cloud Run (vollständig verwaltet) bereitgestellt wird.

  • Datenbanktabelle und Verbindungspool erstellen
  • Tink für die Umschlagverschlüsselung einrichten
  • Daten mithilfe der Umschlagverschlüsselung von Tink mit einem KEK in Cloud KMS verschlüsseln und entschlüsseln

Hinweis

  1. Erstellen Sie mithilfe dieser Anleitung eine Cloud SQL-Instanz. Notieren Sie den Verbindungsstring, den Datenbanknutzer und das Datenbankpasswort, das Sie erstellen.

  2. Erstellen Sie eine Datenbank für Ihre Anwendung. Folgen Sie dazu dieser Anleitung. Notieren Sie sich den Namen der Datenbank.

  3. Erstellen Sie entsprechend dieser Anleitung einen KMS-Schlüssel für Ihre Anwendung. Kopieren Sie den Ressourcennamen des erstellten Schlüssels.

  4. Erstellen Sie ein Dienstkonto mit den „Cloud SQL-Client”-Berechtigungen, indem Sie dieser Anleitung folgen.

  5. Fügen Sie Ihrem Dienstkonto die Berechtigung "Cloud KMS CryptoKey-Verschlüsseler/Entschlüsseler" für den Schlüssel hinzu. Folgen Sie dazu dieser Anleitung.

Erstellen Sie einen Verbindungspool und eine neue Tabelle in der Datenbank.

Java


import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.sql.DataSource;

public class CloudSqlConnectionPool {

  public static DataSource createConnectionPool(String dbUser, String dbPass, String dbName,
      String instanceConnectionName) {
    HikariConfig config = new HikariConfig();
    config.setJdbcUrl(String.format("jdbc:mysql:///%s", dbName));
    config.setUsername(dbUser);
    config.setPassword(dbPass);
    config.addDataSourceProperty("socketFactory", "com.google.cloud.sql.mysql.SocketFactory");
    config.addDataSourceProperty("cloudSqlInstance", instanceConnectionName);
    DataSource pool = new HikariDataSource(config);
    return pool;
  }

  public static void createTable(DataSource pool, String tableName) throws SQLException {
    // Safely attempt to create the table schema.
    try (Connection conn = pool.getConnection()) {
      String stmt = String.format("CREATE TABLE IF NOT EXISTS %s ( "
          + "vote_id SERIAL NOT NULL, time_cast timestamp NOT NULL, team CHAR(6) NOT NULL,"
          + "voter_email VARBINARY(255), PRIMARY KEY (vote_id) );", tableName);
      try (PreparedStatement createTableStatement = conn.prepareStatement(stmt);) {
        createTableStatement.execute();
      }
    }
  }
}

Python

import sqlalchemy


def init_tcp_connection_engine(
    db_user: str, db_pass: str, db_name: str, db_host: str
) -> sqlalchemy.engine.base.Engine:
    """
    Creates a connection to the database using tcp socket.
    """
    # Remember - storing secrets in plaintext is potentially unsafe. Consider using
    # something like https://cloud.google.com/secret-manager/docs/overview to help keep
    # secrets secret.

    # Extract host and port from db_host
    host_args = db_host.split(":")
    db_hostname, db_port = host_args[0], int(host_args[1])

    pool = sqlalchemy.create_engine(
        # Equivalent URL:
        # mysql+pymysql://<db_user>:<db_pass>@<db_host>:<db_port>/<db_name>
        sqlalchemy.engine.url.URL.create(
            drivername="mysql+pymysql",
            username=db_user,  # e.g. "my-database-user"
            password=db_pass,  # e.g. "my-database-password"
            host=db_hostname,  # e.g. "127.0.0.1"
            port=db_port,  # e.g. 3306
            database=db_name,  # e.g. "my-database-name"
        ),
    )
    print("Created TCP connection pool")
    return pool


def init_unix_connection_engine(
    db_user: str,
    db_pass: str,
    db_name: str,
    instance_connection_name: str,
    db_socket_dir: str,
) -> sqlalchemy.engine.base.Engine:
    """
    Creates a connection to the database using unix socket.
    """
    # Remember - storing secrets in plaintext is potentially unsafe. Consider using
    # something like https://cloud.google.com/secret-manager/docs/overview to help keep
    # secrets secret.

    pool = sqlalchemy.create_engine(
        # Equivalent URL:
        # mysql+pymysql://<db_user>:<db_pass>@/<db_name>?unix_socket=<socket_path>/<cloud_sql_instance_name>
        sqlalchemy.engine.url.URL.create(
            drivername="mysql+pymysql",
            username=db_user,  # e.g. "my-database-user"
            password=db_pass,  # e.g. "my-database-password"
            database=db_name,  # e.g. "my-database-name"
            query={"unix_socket": f"{db_socket_dir}/{instance_connection_name}"},
        ),
    )
    print("Created Unix socket connection pool")
    return pool


def init_db(
    db_user: str,
    db_pass: str,
    db_name: str,
    table_name: str,
    instance_connection_name: str = None,
    db_socket_dir: str = None,
    db_host: str = None,
) -> sqlalchemy.engine.base.Engine:
    """Starts a connection to the database and creates voting table if it doesn't exist."""
    if db_host:
        db = init_tcp_connection_engine(db_user, db_pass, db_name, db_host)
    else:
        db = init_unix_connection_engine(
            db_user, db_pass, db_name, instance_connection_name, db_socket_dir
        )

    # Create tables (if they don't already exist)
    with db.connect() as conn:
        conn.execute(
            f"CREATE TABLE IF NOT EXISTS {table_name} "
            "( vote_id SERIAL NOT NULL, time_cast timestamp NOT NULL, "
            "team CHAR(6) NOT NULL, voter_email VARBINARY(255), "
            "PRIMARY KEY (vote_id) );"
        )

    print(f"Created table {table_name} in db {db_name}")
    return db

Initialisieren Sie eine Umschlag-AEAD-Primitive mit Tink.

Java


import com.google.crypto.tink.Aead;
import com.google.crypto.tink.KmsClient;
import com.google.crypto.tink.aead.AeadConfig;
import com.google.crypto.tink.aead.AeadKeyTemplates;
import com.google.crypto.tink.aead.KmsEnvelopeAead;
import com.google.crypto.tink.integration.gcpkms.GcpKmsClient;
import java.security.GeneralSecurityException;

public class CloudKmsEnvelopeAead {

  public static Aead get(String kmsUri) throws GeneralSecurityException {
    AeadConfig.register();

    // Create a new KMS Client
    KmsClient client = new GcpKmsClient().withDefaultCredentials();

    // Create an AEAD primitive using the Cloud KMS key
    Aead gcpAead = client.getAead(kmsUri);

    // Create an envelope AEAD primitive.
    // This key should only be used for client-side encryption to ensure authenticity and integrity
    // of data.
    return new KmsEnvelopeAead(AeadKeyTemplates.AES128_GCM, gcpAead);
  }
}

Python

import logging

import tink
from tink import aead
from tink.integration import gcpkms

logger = logging.getLogger(__name__)


def init_tink_env_aead(key_uri: str, credentials: str) -> tink.aead.KmsEnvelopeAead:
    """
    Initiates the Envelope AEAD object using the KMS credentials.
    """
    aead.register()

    try:
        gcp_client = gcpkms.GcpKmsClient(key_uri, credentials)
        gcp_aead = gcp_client.get_aead(key_uri)
    except tink.TinkError as e:
        logger.error("Error initializing GCP client: %s", e)
        raise e

    # Create envelope AEAD primitive using AES256 GCM for encrypting the data
    # This key should only be used for client-side encryption to ensure authenticity and integrity
    # of data.
    key_template = aead.aead_key_templates.AES256_GCM
    env_aead = aead.KmsEnvelopeAead(key_template, gcp_aead)

    print(f"Created envelope AEAD Primitive using KMS URI: {key_uri}")

    return env_aead

Verschlüsseln Sie Daten und fügen Sie sie in die Datenbank ein.

Java


import com.google.crypto.tink.Aead;
import java.security.GeneralSecurityException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Date;
import javax.sql.DataSource;

public class EncryptAndInsertData {

  public static void main(String[] args) throws GeneralSecurityException, SQLException {
    // Saving credentials in environment variables is convenient, but not secure - consider a more
    // secure solution such as Cloud Secret Manager to help keep secrets safe.
    String dbUser = System.getenv("DB_USER"); // e.g. "root", "mysql"
    String dbPass = System.getenv("DB_PASS"); // e.g. "mysupersecretpassword"
    String dbName = System.getenv("DB_NAME"); // e.g. "votes_db"
    String instanceConnectionName =
        System.getenv("INSTANCE_CONNECTION_NAME"); // e.g. "project-name:region:instance-name"
    String kmsUri = System.getenv("CLOUD_KMS_URI"); // e.g. "gcp-kms://projects/...path/to/key
    // Tink uses the "gcp-kms://" prefix for paths to keys stored in Google Cloud KMS. For more
    // info on creating a KMS key and getting its path, see
    // https://cloud.google.com/kms/docs/quickstart

    String team = "TABS";
    String tableName = "votes";
    String email = "hello@example.com";

    // Initialize database connection pool and create table if it does not exist
    // See CloudSqlConnectionPool.java for setup details
    DataSource pool =
        CloudSqlConnectionPool.createConnectionPool(dbUser, dbPass, dbName, instanceConnectionName);
    CloudSqlConnectionPool.createTable(pool, tableName);

    // Initialize envelope AEAD
    // See CloudKmsEnvelopeAead.java for setup details
    Aead envAead = CloudKmsEnvelopeAead.get(kmsUri);

    encryptAndInsertData(pool, envAead, tableName, team, email);
  }

  public static void encryptAndInsertData(
      DataSource pool, Aead envAead, String tableName, String team, String email)
      throws GeneralSecurityException, SQLException {

    try (Connection conn = pool.getConnection()) {
      String stmt =
          String.format(
              "INSERT INTO %s (team, time_cast, voter_email) VALUES (?, ?, ?);", tableName);
      try (PreparedStatement voteStmt = conn.prepareStatement(stmt); ) {
        voteStmt.setString(1, team);
        voteStmt.setTimestamp(2, new Timestamp(new Date().getTime()));

        // Use the envelope AEAD primitive to encrypt the email, using the team name as
        // associated data. This binds the encryption of the email to the team name, preventing
        // associating an encrypted email in one row with a team name in another row.
        byte[] encryptedEmail = envAead.encrypt(email.getBytes(), team.getBytes());
        voteStmt.setBytes(3, encryptedEmail);

        // Finally, execute the statement. If it fails, an error will be thrown.
        voteStmt.execute();
        System.out.println(String.format("Successfully inserted row into table %s", tableName));
      }
    }
  }
}

Python

import datetime
import logging
import os

import sqlalchemy
import tink

from .cloud_kms_env_aead import init_tink_env_aead
from .cloud_sql_connection_pool import init_db

logger = logging.getLogger(__name__)


def main() -> None:
    """
    Connects to the database, encrypts and inserts some data.
    """
    db_user = os.environ["DB_USER"]  # e.g. "root", "mysql"
    db_pass = os.environ["DB_PASS"]  # e.g. "mysupersecretpassword"
    db_name = os.environ["DB_NAME"]  # e.g. "votes_db"

    # Set if connecting using TCP:
    db_host = os.environ["DB_HOST"]  # e.g. "127.0.0.1"

    # Set if connecting using Unix sockets:
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")

    instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"]
    # e.g. "project-name:region:instance-name"

    credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "")
    key_uri = "gcp-kms://" + os.environ["GCP_KMS_URI"]
    # e.g. "gcp-kms://projects/...path/to/key
    # Tink uses the "gcp-kms://" prefix for paths to keys stored in Google
    # Cloud KMS. For more info on creating a KMS key and getting its path, see
    # https://cloud.google.com/kms/docs/quickstart

    table_name = "votes"
    team = "TABS"
    email = "hello@example.com"

    env_aead = init_tink_env_aead(key_uri, credentials)
    db = init_db(
        db_user,
        db_pass,
        db_name,
        table_name,
        instance_connection_name,
        db_socket_dir,
        db_host,
    )

    encrypt_and_insert_data(db, env_aead, table_name, team, email)


def encrypt_and_insert_data(
    db: sqlalchemy.engine.base.Engine,
    env_aead: tink.aead.KmsEnvelopeAead,
    table_name: str,
    team: str,
    email: str,
) -> None:
    """
    Inserts a vote into the database with email address previously encrypted using
    a KmsEnvelopeAead object.
    """
    time_cast = datetime.datetime.now(tz=datetime.timezone.utc)
    # Use the envelope AEAD primitive to encrypt the email, using the team name as
    # associated data. Encryption with associated data ensures authenticity
    # (who the sender is) and integrity (the data has not been tampered with) of that
    # data, but not its secrecy. (see RFC 5116 for more info)
    encrypted_email = env_aead.encrypt(email.encode(), team.encode())
    # Verify that the team is one of the allowed options
    if team != "TABS" and team != "SPACES":
        logger.error(f"Invalid team specified: {team}")
        return

    # Preparing a statement before hand can help protect against injections.
    stmt = sqlalchemy.text(
        f"INSERT INTO {table_name} (time_cast, team, voter_email)"
        " VALUES (:time_cast, :team, :voter_email)"
    )

    # Using a with statement ensures that the connection is always released
    # back into the pool at the end of statement (even if an error occurs)
    with db.connect() as conn:
        conn.execute(stmt, time_cast=time_cast, team=team, voter_email=encrypted_email)
    print(f"Vote successfully cast for '{team}' at time {time_cast}!")

Fragen Sie die Datenbank ab und entschlüsseln Sie die gespeicherten Daten.

Java


import com.google.crypto.tink.Aead;
import java.security.GeneralSecurityException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import javax.sql.DataSource;

public class QueryAndDecryptData {

  public static void main(String[] args) throws GeneralSecurityException, SQLException {
    // Saving credentials in environment variables is convenient, but not secure - consider a more
    // secure solution such as Cloud Secret Manager to help keep secrets safe.
    String dbUser = System.getenv("DB_USER"); // e.g. "root", "mysql"
    String dbPass = System.getenv("DB_PASS"); // e.g. "mysupersecretpassword"
    String dbName = System.getenv("DB_NAME"); // e.g. "votes_db"
    String instanceConnectionName =
        System.getenv("INSTANCE_CONNECTION_NAME"); // e.g. "project-name:region:instance-name"
    String kmsUri = System.getenv("CLOUD_KMS_URI"); // e.g. "gcp-kms://projects/...path/to/key
    // Tink uses the "gcp-kms://" prefix for paths to keys stored in Google Cloud KMS. For more
    // info on creating a KMS key and getting its path, see
    // https://cloud.google.com/kms/docs/quickstart

    String tableName = "votes123";

    // Initialize database connection pool and create table if it does not exist
    // See CloudSqlConnectionPool.java for setup details
    DataSource pool =
        CloudSqlConnectionPool.createConnectionPool(dbUser, dbPass, dbName, instanceConnectionName);
    CloudSqlConnectionPool.createTable(pool, tableName);

    // Initialize envelope AEAD
    // See CloudKmsEnvelopeAead.java for setup details
    Aead envAead = CloudKmsEnvelopeAead.get(kmsUri);

    // Insert row into table to test
    // See EncryptAndInsert.java for setup details
    EncryptAndInsertData.encryptAndInsertData(
        pool, envAead, tableName, "SPACES", "hello@example.com");

    queryAndDecryptData(pool, envAead, tableName);
  }

  public static void queryAndDecryptData(DataSource pool, Aead envAead, String tableName)
      throws GeneralSecurityException, SQLException {

    try (Connection conn = pool.getConnection()) {
      String stmt =
          String.format(
              "SELECT team, time_cast, voter_email FROM %s ORDER BY time_cast DESC LIMIT 5",
              tableName);
      try (PreparedStatement voteStmt = conn.prepareStatement(stmt); ) {
        ResultSet voteResults = voteStmt.executeQuery();

        System.out.println("Team\tTime Cast\tEmail");
        while (voteResults.next()) {
          String team = voteResults.getString(1);
          Timestamp timeCast = voteResults.getTimestamp(2);

          // Use the envelope AEAD primitive to encrypt the email, using the team name as
          // associated data. This binds the encryption of the email to the team name, preventing
          // associating an encrypted email in one row with a team name in another row.
          String email = new String(envAead.decrypt(voteResults.getBytes(3), team.getBytes()));

          System.out.println(String.format("%s\t%s\t%s", team, timeCast, email));
        }
      }
    }
  }
}

Python

import os

import sqlalchemy
import tink

from .cloud_kms_env_aead import init_tink_env_aead
from .cloud_sql_connection_pool import init_db
from .encrypt_and_insert_data import encrypt_and_insert_data


def main() -> None:
    """
    Connects to the database, inserts encrypted data and retrieves encrypted data.
    """
    db_user = os.environ["DB_USER"]  # e.g. "root", "mysql"
    db_pass = os.environ["DB_PASS"]  # e.g. "mysupersecretpassword"
    db_name = os.environ["DB_NAME"]  # e.g. "votes_db"

    # Set if connecting using TCP:
    db_host = os.environ["DB_HOST"]  # e.g. "127.0.0.1"

    # Set if connecting using Unix sockets:
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")

    instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"]
    # e.g. "project-name:region:instance-name"

    credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "")
    key_uri = "gcp-kms://" + os.environ["GCP_KMS_URI"]
    # e.g. "gcp-kms://projects/...path/to/key
    # Tink uses the "gcp-kms://" prefix for paths to keys stored in Google
    # Cloud KMS. For more info on creating a KMS key and getting its path, see
    # https://cloud.google.com/kms/docs/quickstart

    table_name = "votes"
    team = "TABS"
    email = "hello@example.com"

    env_aead = init_tink_env_aead(key_uri, credentials)
    db = init_db(
        db_user,
        db_pass,
        db_name,
        table_name,
        instance_connection_name,
        db_socket_dir,
        db_host,
    )

    encrypt_and_insert_data(db, env_aead, table_name, team, email)
    query_and_decrypt_data(db, env_aead, table_name)


def query_and_decrypt_data(
    db: sqlalchemy.engine.base.Engine,
    env_aead: tink.aead.KmsEnvelopeAead,
    table_name: str,
) -> list[tuple[str]]:
    """
    Retrieves data from the database and decrypts it using the KmsEnvelopeAead object.
    """
    with db.connect() as conn:
        # Execute the query and fetch all results
        recent_votes = conn.execute(
            f"SELECT team, time_cast, voter_email FROM {table_name} "
            "ORDER BY time_cast DESC LIMIT 5"
        ).fetchall()

        print("Team\tEmail\tTime Cast")
        output = []

        for row in recent_votes:
            team = row[0]
            # Use the envelope AEAD primitive to decrypt the email, using the team name as
            # associated data. Encryption with associated data ensures authenticity
            # (who the sender is) and integrity (the data has not been tampered with) of that
            # data, but not its secrecy. (see RFC 5116 for more info)
            email = env_aead.decrypt(row[2], team.encode()).decode()
            time_cast = row[1]

            # Print recent votes
            print(f"{team}\t{email}\t{time_cast}")
            output.append((team, email, time_cast))
    return output


  1. Sie können den Zugriff auch auf Instanz- oder Datenbankebene beschränken.