Usa BigQuery DataFrames en dbt

dbt (herramienta de compilación de datos) es un framework de línea de comandos de código abierto diseñado para la transformación de datos en almacenes de datos modernos. dbt facilita las transformaciones de datos modulares a través de la creación de modelos reutilizables basados en SQL y Python. La herramienta organiza la ejecución de estas transformaciones dentro del almacén de datos de destino, enfocándose en el paso de transformación de la canalización de ELT. Para obtener más información, consulta la documentación de dbt.

En dbt, un modelo de Python es una transformación de datos que se define y ejecuta con código de Python dentro de tu proyecto de dbt. En lugar de escribir SQL para la lógica de transformación, escribes secuencias de comandos de Python que dbt luego orquesta para ejecutar dentro del entorno del almacén de datos. Un modelo de Python te permite realizar transformaciones de datos que podrían ser complejas o ineficientes para expresarlas en SQL. Esto aprovecha las capacidades de Python y, al mismo tiempo, se beneficia de las funciones de estructura de proyectos, orquestación, administración de dependencias, pruebas y documentación de dbt. Para obtener más información, consulta Modelos de Python.

El adaptador dbt-bigquery admite la ejecución de código de Python definido en BigQuery DataFrames. Esta función está disponible en dbt Cloud y dbt Core. También puedes obtener esta función clonando la versión más reciente del adaptador dbt-bigquery.

Roles obligatorios

El adaptador dbt-bigquery admite la autenticación basada en OAuth y en cuentas de servicio.

Si planeas autenticarte en el adaptador dbt-bigquery con OAuth, pídele a tu administrador que te otorgue los siguientes roles:

Si planeas autenticarte en el adaptador dbt-bigquery con una cuenta de servicio, pídele al administrador que otorgue los siguientes roles a la cuenta de servicio que planeas usar:

Si realizas la autenticación con una cuenta de servicio, asegúrate de tener el rol de usuario de la cuenta de servicio (roles/iam.serviceAccountUser) otorgado para la cuenta de servicio que planeas usar.

Entorno de ejecución de Python

El adaptador dbt-bigquery usa el servicio de ejecutor de notebooks de Colab Enterprise para ejecutar el código de Python de BigQuery DataFrames. El adaptador dbt-bigquery crea y ejecuta automáticamente un notebook de Colab Enterprise para cada modelo de Python. Puedes elegir el proyectoGoogle Cloud en el que deseas ejecutar el notebook. El notebook ejecuta el código de Python del modelo, que la biblioteca de BigQuery DataFrames convierte en SQL de BigQuery. Luego, el SQL de BigQuery se ejecuta en el proyecto configurado. En el siguiente diagrama, se presenta el flujo de control:

Entorno de ejecución de Python de BigQuery DataFrames para un notebook

Si aún no hay una plantilla de notebook disponible en el proyecto y el usuario que ejecuta el código tiene los permisos para crearla, el adaptador dbt-bigquery crea y usa automáticamente la plantilla de notebook predeterminada. También puedes especificar una plantilla de notebook diferente con una configuración de dbt.

La ejecución de notebooks requiere un bucket de Cloud Storage de etapa de pruebas para almacenar el código y los registros. Sin embargo, el adaptador dbt-bigquery copia los registros en los registros de dbt, por lo que no tienes que buscar en el bucket.

Funciones admitidas

El adaptador dbt-bigquery admite las siguientes funciones para los modelos de dbt en Python que ejecutan BigQuery DataFrames:

  • Cargar datos de una tabla de BigQuery existente con la macro dbt.source()
  • Carga datos de otros modelos de dbt con la macro dbt.ref() para compilar dependencias y crear grafos acíclicos dirigidos (DAG) con modelos de Python.
  • Especificar y usar paquetes de Python desde PyPi que se pueden usar con la ejecución de código de Python. Para obtener más información, consulta Configuraciones.
  • Especificar una plantilla de entorno de ejecución de notebook personalizado para tus modelos de BigQuery DataFrames

El adaptador dbt-bigquery admite las siguientes estrategias de materialización:

  • Materialización de tablas, en la que los datos se vuelven a compilar como una tabla en cada ejecución.
  • Materialización incremental con una estrategia de combinación, en la que se agregan datos nuevos o actualizados a una tabla existente, a menudo con una estrategia de combinación para controlar los cambios.

Configura dbt para usar BigQuery DataFrames

Si usas dbt Core, debes usar un archivo profiles.yml para usarlo con BigQuery DataFrames. En el siguiente ejemplo, se usa el método oauth:

your_project_name:
  outputs:
    dev:
      compute_region: us-central1
      dataset: your_bq_dateset
      gcs_bucket: your_gcs_bucket
      job_execution_timeout_seconds: 300
      job_retries: 1
      location: US
      method: oauth
      priority: interactive
      project: your_gcp_project
      threads: 1
      type: bigquery
  target: dev

Si usas dbt Cloud, puedes conectarte a tu plataforma de datos directamente en la interfaz de dbt Cloud. En este caso, no necesitas un archivo profiles.yml. Para obtener más información, consulta Acerca de profiles.yml.

Este es un ejemplo de una configuración a nivel del proyecto para el archivo dbt_project.yml:

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models.
name: 'your_project_name'
version: '1.0.0'

# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the config(...) macro.

models:
  your_project_name:
    submission_method: bigframes
    notebook_template_id: 7018811640745295872
    packages: ["scikit-learn", "mlflow"]
    timeout: 3000
    # Config indicated by + and applies to all files under models/example/
    example:
      +materialized: view

Algunos parámetros también se pueden configurar con el método dbt.config dentro de tu código de Python. Si estos parámetros de configuración entran en conflicto con tu archivo dbt_project.yml, las configuraciones con dbt.config tendrán prioridad.

Para obtener más información, consulta Parámetros de configuración de modelos y dbt_project.yml.

Configuraciones

Puedes configurar las siguientes opciones con el método dbt.config en tu modelo de Python. Estas configuraciones anulan la configuración a nivel del proyecto.

Configuración Obligatorio Uso
submission_method submission_method=bigframes
notebook_template_id No Si no se especifica, se crea y usa una plantilla predeterminada.
packages No Especifica la lista adicional de paquetes de Python, si es necesario.
timeout No Opcional: Extiende el tiempo de espera de ejecución del trabajo.

Ejemplos de modelos de Python

En las siguientes secciones, se presentan ejemplos de situaciones y modelos de Python.

Carga datos desde una tabla de BigQuery

Para usar datos de una tabla de BigQuery existente como fuente en tu modelo de Python, primero debes definir esta fuente en un archivo YAML. El siguiente ejemplo se define en un archivo source.yml.

version: 2

sources:
  - name: my_project_source   # A custom name for this source group
    database: bigframes-dev   # Your Google Cloud project ID
    schema: yyy_test_us       # The BigQuery dataset containing the table
    tables:
      - name: dev_sql1        # The name of your BigQuery table

Luego, compilas tu modelo de Python, que puede usar las fuentes de datos configuradas en este archivo YAML:

def model(dbt, session):
    # Configure the model to use BigFrames for submission
    dbt.config(submission_method="bigframes")

    # Load data from the 'dev_sql1' table within 'my_project_source'
    source_data = dbt.source('my_project_source', 'dev_sql1')

    # Example transformation: Create a new column 'id_new'
    source_data['id_new'] = source_data['id'] * 10

    return source_data

Cómo hacer referencia a otro modelo

Puedes compilar modelos que dependen del resultado de otros modelos de dbt, como se muestra en el siguiente ejemplo. Esto es útil para crear canalizaciones de datos modulares.

def model(dbt, session):
    # Configure the model to use BigFrames
    dbt.config(submission_method="bigframes")

    # Reference another dbt model named 'dev_sql1'.
    # It assumes you have a model defined in 'dev_sql1.sql' or 'dev_sql1.py'.
    df_from_sql = dbt.ref("dev_sql1")

    # Example transformation on the data from the referenced model
    df_from_sql['id'] = df_from_sql['id'] * 100

    return df_from_sql

Especifica una dependencia de paquete

Si tu modelo de Python requiere bibliotecas de terceros específicas, como MLflow o Boto3, puedes declarar el paquete en la configuración del modelo, como se muestra en el siguiente ejemplo. Estos paquetes se instalan en el entorno de ejecución.

def model(dbt, session):
    # Configure the model for BigFrames and specify required packages
    dbt.config(
        submission_method="bigframes",
        packages=["mlflow", "boto3"]  # List the packages your model needs
    )

    # Import the specified packages for use in your model
    import mlflow
    import boto3

    # Example: Create a DataFrame showing the versions of the imported packages
    data = {
        "mlflow_version": [mlflow.__version__],
        "boto3_version": [boto3.__version__],
        "note": ["This demonstrates accessing package versions after import."]
    }
    bdf = bpd.DataFrame(data)

    return bdf

Especifica una plantilla no predeterminada

Para tener más control sobre el entorno de ejecución o usar una configuración preconfigurada, puedes especificar una plantilla de notebook que no sea predeterminada para tu modelo de BigQuery DataFrames, como se muestra en el siguiente ejemplo.

def model(dbt, session):
    dbt.config(
        submission_method="bigframes",
     # ID of your pre-created notebook template
        notebook_template_id="857350349023451yyyy",
    )

    data = {"int": [1, 2, 3], "str": ['a', 'b', 'c']}
    return bpd.DataFrame(data=data)

Materialización de las tablas

Cuando dbt ejecuta tus modelos de Python, necesita saber cómo guardar los resultados en tu almacén de datos. Esto se denomina materialización.

Para la materialización de tablas estándar, dbt crea o reemplaza por completo una tabla en tu almacén con el resultado de tu modelo cada vez que se ejecuta. Esto se hace de forma predeterminada o configurando de forma explícita la propiedad materialized='table', como se muestra en el siguiente ejemplo.

def model(dbt, session):
    dbt.config(
        submission_method="bigframes",
     # Instructs dbt to create/replace this model as a table
        materialized='table',
    )

    data = {"int_column": [1, 2], "str_column": ['a', 'b']}
    return bpd.DataFrame(data=data)

La materialización incremental con una estrategia de combinación permite que dbt actualice tu tabla solo con filas nuevas o modificadas. Esto es útil para conjuntos de datos grandes, ya que la compilación completa de una tabla cada vez puede ser ineficiente. La estrategia de combinación es una forma común de controlar estas actualizaciones.

Este enfoque integra los cambios de forma inteligente de la siguiente manera:

  • Actualiza las filas existentes que cambiaron.
  • Agregar filas nuevas
  • Opcional, según la configuración: Borrar filas que ya no están presentes en la fuente

Para usar la estrategia de combinación, debes especificar una propiedad unique_key que dbt pueda usar para identificar las filas coincidentes entre el resultado de tu modelo y la tabla existente, como se muestra en el siguiente ejemplo.

def model(dbt, session):
    dbt.config(
        submission_method="bigframes",
        materialized='incremental',
        incremental_strategy='merge',
        unique_key='int',  # Specifies the column to identify unique rows
    )

    # In this example:
    # - Row with 'int' value 1 remains unchanged.
    # - Row with 'int' value 2 has been updated.
    # - Row with 'int' value 4 is a new addition.
    # The 'merge' strategy will ensure that only the updated row ('int 2')
    # and the new row ('int 4') are processed and integrated into the table.
    data = {"int": [1, 2, 4], "str": ['a', 'bbbb', 'd']}
    return bpd.DataFrame(data=data)

Soluciona problemas

Puedes observar la ejecución de Python en los registros de dbt.

Además, puedes ver el código y los registros (incluidas las ejecuciones anteriores) en la página Ejecuciones de Colab Enterprise.

Ir a Ejecuciones de Colab Enterprise

Facturación

Cuando usas el adaptador dbt-bigquery con BigQuery DataFrames, se aplican Google Cloud cargos de lo siguiente:

  • Ejecución de notebooks: Se te cobra por la ejecución del entorno de ejecución de notebooks. Para obtener más información, consulta Precios del entorno de ejecución de Notebook.

  • Ejecución de consultas de BigQuery: En el notebook, BigQuery DataFrames convierte Python a SQL y ejecuta el código en BigQuery. Se te cobrará según la configuración de tu proyecto y tu consulta, como se describe en los precios de BigQuery DataFrames.

Puedes usar la siguiente etiqueta de facturación en la consola de facturación de BigQuery para filtrar el informe de facturación de la ejecución del notebook y de las ejecuciones de BigQuery activadas por el adaptador dbt-bigquery:

  • Etiqueta de ejecución de BigQuery: bigframes-dbt-api

¿Qué sigue?