Transmite de Pub/Sub a BigQuery


En este instructivo, se usa la plantilla Suscripción de Pub/Sub a BigQuery para crear y ejecutar un trabajo de plantilla de Dataflow con la Google Cloud consola o Google Cloud CLI. En el instructivo, se explica un ejemplo de canalización de transmisión que lee mensajes codificados en JSON desde Pub/Sub y los escribe en una tabla de BigQuery.

Las canalizaciones de integración de datos y estadísticas de transmisiones usan Pub/Sub para transferir y distribuir datos. Pub/Sub te permite crear sistemas de productores y consumidores de eventos, llamados publicadores y suscriptores. Los publicadores envían eventos al servicio de Pub/Sub de forma asíncrona, y Pub/Sub entrega los eventos a todos los servicios que necesitan reaccionar ante ellos.

Dataflow es un servicio completamente administrado para transformar y enriquecer datos en modos de transmisión (en tiempo real) y por lotes. Proporciona un entorno de desarrollo de canalización simplificado que usa el SDK de Apache Beam para transformar los datos entrantes y generar los datos transformados.

El beneficio de este flujo de trabajo es que puedes usar UDFs para transformar los datos del mensaje antes de que se escriban en BigQuery.

Antes de ejecutar una canalización de Dataflow para este caso de uso, considera si una suscripción a BigQuery de Pub/Sub con una UDF satisface tus requisitos.

Objetivos

  • Cree un tema de Pub/Sub.
  • Crea un conjunto de datos de BigQuery con una tabla y un esquema.
  • Usa una plantilla de transmisión proporcionada por Google para transmitir datos desde tu suscripción de Pub/Sub a BigQuery mediante Dataflow.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • BigQuery

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.

Es posible que los usuarios Google Cloud nuevos cumplan con los requisitos para obtener una prueba gratuita.

Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.

Antes de comenzar

En esta sección, se muestra cómo seleccionar un proyecto, habilitar las APIs y otorgar los roles adecuados a tu cuenta de usuario y a la cuenta de servicio de trabajador.

Console

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  8. Para completar los pasos de este instructivo, la cuenta de usuario debe tener el rol de usuario de la cuenta de servicio. La cuenta de servicio predeterminada de Compute Engine debe tener los siguientes roles: Trabajador de Dataflow, Administrador de Dataflow, Editor de Pub/Sub. Administrador de objetos de almacenamiento y Editor de datos de BigQuery. Para agregar los roles necesarios en la consola de Google Cloud , haz lo siguiente:

    1. En la consola de Google Cloud , ve a la página IAM.

      Ir a IAM
    2. Elige tu proyecto.
    3. En la fila que contiene tu cuenta de usuario, haz clic en Editar principal y, luego, en Agregar otro rol.
    4. En la lista desplegable, selecciona el rol Usuario de cuenta de servicio.
    5. En la fila que contiene la cuenta de servicio predeterminada de Compute Engine, haz clic en Editar principal y, luego, en Agregar otro rol.
    6. En la lista desplegable, selecciona el rol Trabajador de Dataflow.
    7. Repite los pasos para los roles de Administrador de Dataflow, Editor de Pub/Sub, Administrador de objeto de almacenamiento y Editor de datos de BigQuery y, luego, haz clic en Guardar.

      Para obtener más información sobre cómo otorgar roles, consulta Otorga un rol de IAM mediante la consola.

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Otorga roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el ID de tu proyecto.
    • PROJECT_NUMBER: Es el número de tu proyecto. Para encontrar el número de tu proyecto, usa el comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: Es cada rol individual.

Crea un bucket de Cloud Storage

Primero, crea un bucket de Cloud Storage con la consola de Google Cloud o Google Cloud CLI. La canalización de Dataflow usa este bucket como ubicación de almacenamiento temporal.

Console

  1. En la Google Cloud consola, ve a la página Buckets de Cloud Storage.

    Ir a Buckets

  2. Haz clic en Crear.

  3. En la página Crear un bucket, en Nombre del bucket, ingresa un nombre que cumpla con los requisitos de nombres de buckets. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global. No selecciones las otras opciones.

  4. Haz clic en Crear.

gcloud

Usa el comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME

Reemplaza BUCKET_NAME por un nombre para tu bucket de Cloud Storage que cumpla con los requisitos de nombres de buckets. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global.

Crea un tema y una suscripción de Pub/Sub

Crea un tema de Pub/Sub y, luego, crea una suscripción a ese tema.

Console

Para crear un tema, completa los siguientes pasos:

  1. En la consola de Google Cloud , ve a la página Temas de Pub/Sub.

    Ir a temas

  2. Haga clic en Crear tema.

  3. En el campo ID de tema, ingresa un ID para tu tema. Si deseas obtener información sobre cómo nombrar un tema, consulta Lineamientos para asignar un nombre a un tema o una suscripción.

  4. Mantén la opción Agregar una suscripción predeterminada. No selecciones las otras opciones.

  5. Haz clic en Crear.

  6. En la página de detalles del tema, el nombre de la suscripción que se creó aparece en ID de suscripción. Anota este valor para los pasos posteriores.

gcloud

Para crear un tema, ejecuta el comando gcloud pubsub topics create. Para obtener información sobre cómo asignar un nombre a una suscripción, consulta los Lineamientos para asignar un nombre a un tema o una suscripción.

gcloud pubsub topics create TOPIC_ID

Reemplaza TOPIC_ID por un nombre para tu tema de Pub/Sub.

Para crear una suscripción a un tema, ejecuta el comando gcloud pubsub subscriptions create:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Reemplaza SUBSCRIPTION_ID por un nombre para tu suscripción a Pub/Sub.

Crea una tabla de BigQuery

En este paso, crearás una tabla de BigQuery con el siguiente esquema:

Nombre de la columna Tipo de datos
name STRING
customer_id INTEGER

Si aún no tienes un conjunto de datos de BigQuery, primero crea uno. Para obtener más información, consulta Crea conjuntos de datos. Luego, crea una tabla vacía nueva:

Console

  1. Ve a la página de BigQuery.

    Ir a BigQuery

  2. En el panel Explorador, expande tu proyecto y, luego, elige un conjunto de datos.

  3. En la sección Información del conjunto de datos, haz clic en Crear tabla.

  4. En la lista Crear tabla desde, selecciona Tabla vacía.

  5. En el cuadro Tabla, ingresa el nombre de la tabla.

  6. En la sección Schema, haz clic en Edit as text.

  7. Pega la siguiente definición de esquema:

    name:STRING,
    customer_id:INTEGER
    
  8. Haz clic en Crear tabla.

gcloud

Usa el comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto.
  • DATASET_NAME: El nombre del conjunto de datos.
  • TABLE_NAME: el nombre de la tabla que se creará

Ejecuta la canalización

Ejecuta una canalización de transmisión con la plantilla Suscripción de Pub/Sub a BigQuery proporcionada por Google. La canalización obtiene datos entrantes del tema de Pub/Sub y los envía a tu conjunto de datos de BigQuery.

Console

  1. En la consola de Google Cloud , ve a la página Trabajos de Dataflow.

    Ir a Trabajos

  2. Haz clic en Create job from template (Crear un trabajo a partir de una plantilla).

  3. En Job name (Nombre del trabajo), ingresa un nombre para el trabajo de Dataflow.

  4. En Extremo regional, selecciona una región para tu trabajo de Dataflow.

  5. En Plantilla de Dataflow, selecciona la plantilla Suscripción de Pub/Sub a BigQuery.

  6. En Tabla de salida de BigQuery, selecciona Explorar y elige tu tabla de BigQuery.

  7. En la lista Suscripción de entrada de Pub/Sub, selecciona la suscripción de Pub/Sub.

  8. En Ubicación temporal, ingresa lo siguiente:

    gs://BUCKET_NAME/temp/
    

    Reemplaza BUCKET_NAME por el nombre de tu depósito de Cloud Storage. La carpeta temp almacena archivos temporales para los trabajos de Dataflow.

  9. Haga clic en Ejecutar trabajo.

gcloud

Para ejecutar la plantilla en tu shell o terminal, usa el comando gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

Reemplaza las siguientes variables:

  • JOB_NAME: Es el nombre del trabajo.
  • DATAFLOW_REGION: Es una región para el trabajo.
  • PROJECT_ID: Nombre de tu proyecto Google Cloud
  • SUBSCRIPTION_ID: Es el nombre de tu suscripción a Pub/Sub.
  • DATASET_NAME: Es el nombre de tu conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de tu tabla de BigQuery.

Publica mensajes en Pub/Sub

Después de que se inicie el trabajo de Dataflow, puedes publicar mensajes en Pub/Sub, y la canalización los escribirá en BigQuery.

Console

  1. En la consola de Google Cloud , ve a la página Temas de Pub/Sub.

    Ir a temas

  2. En la lista de temas, haz clic en el nombre de tu tema.

  3. Haz clic en Mensajes.

  4. Haz clic en Publicar mensajes.

  5. En Cantidad de mensajes, ingresa 10.

  6. En Cuerpo del mensaje, ingresa {"name": "Alice", "customer_id": 1}.

  7. Haz clic en Publicar.

gcloud

Para publicar mensajes en tu tema, usa el comando gcloud pubsub topics publish.

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

Reemplaza TOPIC_ID por el nombre del tema.

Ve los resultados

Visualiza los datos escritos en tu tabla de BigQuery. Los datos pueden tardar hasta un minuto en comenzar a aparecer en la tabla.

Console

  1. En la consola de Google Cloud , ve a la página BigQuery.
    Ir a la página de BigQuery

  2. En el Editor de consultas, ejecute la siguiente consulta:

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    Reemplaza las siguientes variables:

    • PROJECT_ID: El nombre de tu proyecto Google Cloud
    • DATASET_NAME: Es el nombre de tu conjunto de datos de BigQuery.
    • TABLE_NAME: Es el nombre de tu tabla de BigQuery.

gcloud

Ejecuta la siguiente consulta para verificar los resultados en BigQuery:

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

Reemplaza las siguientes variables:

  • PROJECT_ID: El nombre de tu proyecto Google Cloud
  • DATASET_NAME: Es el nombre de tu conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de tu tabla de BigQuery.

Usar una UDF para transformar los datos

En este instructivo, se supone que los mensajes de Pub/Sub tienen formato JSON y que el esquema de la tabla de BigQuery coincide con los datos JSON.

De forma opcional, puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos antes de que se escriban en BigQuery. La UDF puede realizar un procesamiento adicional, como filtrar, quitar información de identificación personal (PII) o enriquecer los datos con campos adicionales.

Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Usa una tabla de mensajes no entregados

Mientras se ejecuta el trabajo, es posible que la canalización no pueda escribir mensajes individuales en BigQuery. Los errores posibles incluyen los siguientes:

  • Errores de serialización, incluido el JSON con formato incorrecto
  • Errores de conversión de tipo, causados por una falta de coincidencia en el esquema de la tabla y los datos JSON.
  • Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla.

La canalización escribe estos errores en una tabla de mensajes no entregados en BigQuery. De forma predeterminada, la canalización crea automáticamente una tabla de mensajes no entregados llamada TABLE_NAME_error_records, en la que TABLE_NAME es el nombre de la tabla de salida. Para usar un nombre diferente, configura el parámetro de plantilla outputDeadletterTable.

Realiza una limpieza

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el Google Cloud proyecto que creaste para el instructivo.

Console

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Borra los recursos individuales

Si quieres reutilizar el proyecto más adelante, puedes conservar el proyecto, pero borrar los recursos que creaste durante el instructivo.

Detén la canalización de Dataflow

Console

  1. En la consola de Google Cloud , ve a la página Trabajos de Dataflow.

    Ir a Trabajos

  2. Haz clic en el trabajo que deseas detener.

    Para detener un trabajo, su estado debe ser En ejecución.

  3. En la página de detalles del trabajo, haz clic en Detener.

  4. Haz clic en Cancelar.

  5. Para confirmar tu elección, haz clic en Detener trabajo.

gcloud

Para cancelar tu trabajo de Dataflow, usa el comando gcloud dataflow jobs.

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Limpia los Google Cloud recursos del proyecto

Console

  1. Borra el tema y la suscripción de Pub/Sub.

    1. Ve a la página Temas de Pub/Sub en la consola de Google Cloud .

      Ir a temas

    2. Selecciona el tema que creaste.

    3. Haz clic en Borrar para borrar el tema de forma definitiva.

    4. Ve a la página Suscripciones de Pub/Sub en la consola de Google Cloud .

      Ir a las suscripciones

    5. Selecciona la suscripción que se creó con tu tema.

    6. Haz clic en Borrar para borrar la suscripción de forma definitiva.

  2. Borra la tabla y el conjunto de datos de BigQuery.

    1. En la consola de Google Cloud , ve a la página BigQuery.

      Ir a BigQuery

    2. En el panel Explorador, expande tu proyecto.

    3. Junto al conjunto de datos que deseas borrar, haz clic en Ver acciones y, luego, en borrar.

  3. Borra el bucket de Cloud Storage.

    1. En la Google Cloud consola, ve a la página Buckets de Cloud Storage.

      Ir a Buckets

    2. Selecciona el bucket que deseas borrar, haz clic en Borrar y sigue las instrucciones.

gcloud

  1. Para borrar la suscripción y el tema de Pub/Sub, usa los comandos gcloud pubsub subscriptions delete y gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. Para borrar la tabla de BigQuery, usa el comando bq rm.

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  3. Borra el conjunto de datos de BigQuery. El conjunto de datos por sí solo no genera cargos.

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  4. Para borrar el bucket de Cloud Storage y sus objetos, usa el comando gcloud storage rm. El bucket por sí solo no genera cargos.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revoca credenciales

Console

Si conservas tu proyecto, revoca los roles que otorgaste a la cuenta de servicio predeterminada de Compute Engine.

  1. En la consola de Google Cloud , ve a la página IAM.

Ir a IAM

  1. Selecciona un proyecto, una carpeta o una organización.

  2. Busca la fila que contiene el principal cuyo acceso deseas revocar. En esa fila, haz clic en Editar principal.

  3. Haz clic en el botón Borrar para cada rol que desees revocar y, luego, haz clic en Guardar.

gcloud

  • Si conservas tu proyecto, revoca los roles que otorgaste a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

¿Qué sigue?