En este instructivo, se usa la plantilla Suscripción de Pub/Sub a BigQuery para crear y ejecutar un trabajo de plantilla de Dataflow con la Google Cloud consola o Google Cloud CLI. En el instructivo, se explica un ejemplo de canalización de transmisión que lee mensajes codificados en JSON desde Pub/Sub y los escribe en una tabla de BigQuery.
Las canalizaciones de integración de datos y estadísticas de transmisiones usan Pub/Sub para transferir y distribuir datos. Pub/Sub te permite crear sistemas de productores y consumidores de eventos, llamados publicadores y suscriptores. Los publicadores envían eventos al servicio de Pub/Sub de forma asíncrona, y Pub/Sub entrega los eventos a todos los servicios que necesitan reaccionar ante ellos.
Dataflow es un servicio completamente administrado para transformar y enriquecer datos en modos de transmisión (en tiempo real) y por lotes. Proporciona un entorno de desarrollo de canalización simplificado que usa el SDK de Apache Beam para transformar los datos entrantes y generar los datos transformados.
El beneficio de este flujo de trabajo es que puedes usar UDFs para transformar los datos del mensaje antes de que se escriban en BigQuery.
Antes de ejecutar una canalización de Dataflow para este caso de uso, considera si una suscripción a BigQuery de Pub/Sub con una UDF satisface tus requisitos.
Objetivos
- Cree un tema de Pub/Sub.
- Crea un conjunto de datos de BigQuery con una tabla y un esquema.
- Usa una plantilla de transmisión proporcionada por Google para transmitir datos desde tu suscripción de Pub/Sub a BigQuery mediante Dataflow.
Costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.
Antes de comenzar
En esta sección, se muestra cómo seleccionar un proyecto, habilitar las APIs y otorgar los roles adecuados a tu cuenta de usuario y a la cuenta de servicio de trabajador.
Console
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Para completar los pasos de este instructivo, la cuenta de usuario debe tener el rol de usuario de la cuenta de servicio. La cuenta de servicio predeterminada de Compute Engine debe tener los siguientes roles: Trabajador de Dataflow, Administrador de Dataflow, Editor de Pub/Sub. Administrador de objetos de almacenamiento y Editor de datos de BigQuery. Para agregar los roles necesarios en la consola de Google Cloud , haz lo siguiente:
En la consola de Google Cloud , ve a la página IAM.
Ir a IAM- Elige tu proyecto.
- En la fila que contiene tu cuenta de usuario, haz clic en Editar principal y, luego, en Agregar otro rol.
- En la lista desplegable, selecciona el rol Usuario de cuenta de servicio.
- En la fila que contiene la cuenta de servicio predeterminada de Compute Engine, haz clic en Editar principal y, luego, en Agregar otro rol.
- En la lista desplegable, selecciona el rol Trabajador de Dataflow.
Repite los pasos para los roles de Administrador de Dataflow, Editor de Pub/Sub, Administrador de objeto de almacenamiento y Editor de datos de BigQuery y, luego, haz clic en Guardar.
Para obtener más información sobre cómo otorgar roles, consulta Otorga un rol de IAM mediante la consola.
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Otorga roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Reemplaza lo siguiente:
PROJECT_ID
: Es el ID de tu proyecto.PROJECT_NUMBER
: Es el número de tu proyecto. Para encontrar el número de tu proyecto, usa el comandogcloud projects describe
.SERVICE_ACCOUNT_ROLE
: Es cada rol individual.
Crea un bucket de Cloud Storage
Primero, crea un bucket de Cloud Storage con la consola de Google Cloud o Google Cloud CLI. La canalización de Dataflow usa este bucket como ubicación de almacenamiento temporal.
Console
En la Google Cloud consola, ve a la página Buckets de Cloud Storage.
Haz clic en Crear.
En la página Crear un bucket, en Nombre del bucket, ingresa un nombre que cumpla con los requisitos de nombres de buckets. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global. No selecciones las otras opciones.
Haz clic en Crear.
gcloud
Usa el comando gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME
Reemplaza BUCKET_NAME
por un nombre para tu bucket de Cloud Storage que cumpla con los requisitos de nombres de buckets.
Los nombres de buckets de Cloud Storage deben ser únicos a nivel global.
Crea un tema y una suscripción de Pub/Sub
Crea un tema de Pub/Sub y, luego, crea una suscripción a ese tema.
Console
Para crear un tema, completa los siguientes pasos:
En la consola de Google Cloud , ve a la página Temas de Pub/Sub.
Haga clic en Crear tema.
En el campo ID de tema, ingresa un ID para tu tema. Si deseas obtener información sobre cómo nombrar un tema, consulta Lineamientos para asignar un nombre a un tema o una suscripción.
Mantén la opción Agregar una suscripción predeterminada. No selecciones las otras opciones.
Haz clic en Crear.
- En la página de detalles del tema, el nombre de la suscripción que se creó aparece en ID de suscripción. Anota este valor para los pasos posteriores.
gcloud
Para crear un tema, ejecuta el comando gcloud pubsub topics create
. Para obtener información sobre cómo asignar un nombre a una suscripción, consulta los Lineamientos para asignar un nombre a un tema o una suscripción.
gcloud pubsub topics create TOPIC_ID
Reemplaza TOPIC_ID
por un nombre para tu tema de Pub/Sub.
Para crear una suscripción a un tema, ejecuta el comando gcloud pubsub subscriptions create
:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
Reemplaza SUBSCRIPTION_ID
por un nombre para tu suscripción a Pub/Sub.
Crea una tabla de BigQuery
En este paso, crearás una tabla de BigQuery con el siguiente esquema:
Nombre de la columna | Tipo de datos |
---|---|
name |
STRING |
customer_id |
INTEGER |
Si aún no tienes un conjunto de datos de BigQuery, primero crea uno. Para obtener más información, consulta Crea conjuntos de datos. Luego, crea una tabla vacía nueva:
Console
Ve a la página de BigQuery.
En el panel Explorador, expande tu proyecto y, luego, elige un conjunto de datos.
En la sección Información del conjunto de datos, haz clic en
Crear tabla.En la lista Crear tabla desde, selecciona Tabla vacía.
En el cuadro Tabla, ingresa el nombre de la tabla.
En la sección Schema, haz clic en Edit as text.
Pega la siguiente definición de esquema:
name:STRING, customer_id:INTEGER
Haz clic en Crear tabla.
gcloud
Usa el comando bq mk
.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Reemplaza lo siguiente:
PROJECT_ID
: El ID de tu proyecto.DATASET_NAME
: El nombre del conjunto de datos.TABLE_NAME
: el nombre de la tabla que se creará
Ejecuta la canalización
Ejecuta una canalización de transmisión con la plantilla Suscripción de Pub/Sub a BigQuery proporcionada por Google. La canalización obtiene datos entrantes del tema de Pub/Sub y los envía a tu conjunto de datos de BigQuery.
Console
En la consola de Google Cloud , ve a la página Trabajos de Dataflow.
Haz clic en Create job from template (Crear un trabajo a partir de una plantilla).
En Job name (Nombre del trabajo), ingresa un nombre para el trabajo de Dataflow.
En Extremo regional, selecciona una región para tu trabajo de Dataflow.
En Plantilla de Dataflow, selecciona la plantilla Suscripción de Pub/Sub a BigQuery.
En Tabla de salida de BigQuery, selecciona Explorar y elige tu tabla de BigQuery.
En la lista Suscripción de entrada de Pub/Sub, selecciona la suscripción de Pub/Sub.
En Ubicación temporal, ingresa lo siguiente:
gs://BUCKET_NAME/temp/
Reemplaza
BUCKET_NAME
por el nombre de tu depósito de Cloud Storage. La carpetatemp
almacena archivos temporales para los trabajos de Dataflow.Haga clic en Ejecutar trabajo.
gcloud
Para ejecutar la plantilla en tu shell o terminal, usa el comando gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
Reemplaza las siguientes variables:
JOB_NAME
: Es el nombre del trabajo.DATAFLOW_REGION
: Es una región para el trabajo.PROJECT_ID
: Nombre de tu proyecto Google CloudSUBSCRIPTION_ID
: Es el nombre de tu suscripción a Pub/Sub.DATASET_NAME
: Es el nombre de tu conjunto de datos de BigQuery.TABLE_NAME
: Es el nombre de tu tabla de BigQuery.
Publica mensajes en Pub/Sub
Después de que se inicie el trabajo de Dataflow, puedes publicar mensajes en Pub/Sub, y la canalización los escribirá en BigQuery.
Console
En la consola de Google Cloud , ve a la página Temas de Pub/Sub.
En la lista de temas, haz clic en el nombre de tu tema.
Haz clic en Mensajes.
Haz clic en Publicar mensajes.
En Cantidad de mensajes, ingresa
10
.En Cuerpo del mensaje, ingresa
{"name": "Alice", "customer_id": 1}
.Haz clic en Publicar.
gcloud
Para publicar mensajes en tu tema, usa el comando gcloud pubsub topics publish
.
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
Reemplaza TOPIC_ID
por el nombre del tema.
Ve los resultados
Visualiza los datos escritos en tu tabla de BigQuery. Los datos pueden tardar hasta un minuto en comenzar a aparecer en la tabla.
Console
En la consola de Google Cloud , ve a la página BigQuery.
Ir a la página de BigQueryEn el Editor de consultas, ejecute la siguiente consulta:
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
Reemplaza las siguientes variables:
PROJECT_ID
: El nombre de tu proyecto Google CloudDATASET_NAME
: Es el nombre de tu conjunto de datos de BigQuery.TABLE_NAME
: Es el nombre de tu tabla de BigQuery.
gcloud
Ejecuta la siguiente consulta para verificar los resultados en BigQuery:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
Reemplaza las siguientes variables:
PROJECT_ID
: El nombre de tu proyecto Google CloudDATASET_NAME
: Es el nombre de tu conjunto de datos de BigQuery.TABLE_NAME
: Es el nombre de tu tabla de BigQuery.
Usar una UDF para transformar los datos
En este instructivo, se supone que los mensajes de Pub/Sub tienen formato JSON y que el esquema de la tabla de BigQuery coincide con los datos JSON.
De forma opcional, puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos antes de que se escriban en BigQuery. La UDF puede realizar un procesamiento adicional, como filtrar, quitar información de identificación personal (PII) o enriquecer los datos con campos adicionales.
Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.
Usa una tabla de mensajes no entregados
Mientras se ejecuta el trabajo, es posible que la canalización no pueda escribir mensajes individuales en BigQuery. Los errores posibles incluyen los siguientes:
- Errores de serialización, incluido el JSON con formato incorrecto
- Errores de conversión de tipo, causados por una falta de coincidencia en el esquema de la tabla y los datos JSON.
- Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla.
La canalización escribe estos errores en una tabla de mensajes no entregados en BigQuery. De forma predeterminada, la canalización crea automáticamente una tabla de mensajes no entregados llamada TABLE_NAME_error_records
, en la que TABLE_NAME
es el nombre de la tabla de salida.
Para usar un nombre diferente, configura el parámetro de plantilla outputDeadletterTable
.
Realiza una limpieza
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.
Borra el proyecto
La manera más fácil de eliminar la facturación es borrar el Google Cloud proyecto que creaste para el instructivo.
Console
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Borra los recursos individuales
Si quieres reutilizar el proyecto más adelante, puedes conservar el proyecto, pero borrar los recursos que creaste durante el instructivo.
Detén la canalización de Dataflow
Console
En la consola de Google Cloud , ve a la página Trabajos de Dataflow.
Haz clic en el trabajo que deseas detener.
Para detener un trabajo, su estado debe ser En ejecución.
En la página de detalles del trabajo, haz clic en Detener.
Haz clic en Cancelar.
Para confirmar tu elección, haz clic en Detener trabajo.
gcloud
Para cancelar tu trabajo de Dataflow, usa el comando gcloud dataflow jobs
.
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
Limpia los Google Cloud recursos del proyecto
Console
Borra el tema y la suscripción de Pub/Sub.
Ve a la página Temas de Pub/Sub en la consola de Google Cloud .
Selecciona el tema que creaste.
Haz clic en Borrar para borrar el tema de forma definitiva.
Ve a la página Suscripciones de Pub/Sub en la consola de Google Cloud .
Selecciona la suscripción que se creó con tu tema.
Haz clic en Borrar para borrar la suscripción de forma definitiva.
Borra la tabla y el conjunto de datos de BigQuery.
En la consola de Google Cloud , ve a la página BigQuery.
En el panel Explorador, expande tu proyecto.
Junto al conjunto de datos que deseas borrar, haz clic en
Ver acciones y, luego, en borrar.
Borra el bucket de Cloud Storage.
En la Google Cloud consola, ve a la página Buckets de Cloud Storage.
Selecciona el bucket que deseas borrar, haz clic en
Borrar y sigue las instrucciones.
gcloud
Para borrar la suscripción y el tema de Pub/Sub, usa los comandos
gcloud pubsub subscriptions delete
ygcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
Para borrar la tabla de BigQuery, usa el comando
bq rm
.bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
Borra el conjunto de datos de BigQuery. El conjunto de datos por sí solo no genera cargos.
bq rm -r -f -d PROJECT_ID:tutorial_dataset
Para borrar el bucket de Cloud Storage y sus objetos, usa el comando
gcloud storage rm
. El bucket por sí solo no genera cargos.gcloud storage rm gs://BUCKET_NAME --recursive
Revoca credenciales
Console
Si conservas tu proyecto, revoca los roles que otorgaste a la cuenta de servicio predeterminada de Compute Engine.
- En la consola de Google Cloud , ve a la página IAM.
Selecciona un proyecto, una carpeta o una organización.
Busca la fila que contiene el principal cuyo acceso deseas revocar. En esa fila, haz clic en
Editar principal.Haz clic en el botón Borrar
para cada rol que desees revocar y, luego, haz clic en Guardar.
gcloud
- Si conservas tu proyecto, revoca los roles que otorgaste a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
¿Qué sigue?
- Extiende tu plantilla de Dataflow con UDF.
- Obtén más información para usar las plantillas de Dataflow.
- Consulta todas las plantillas proporcionadas por Google.
- Lee sobre el uso de Pub/Sub para crear y usar temas y cómo crear una suscripción de extracción.
- Lee sobre el uso de BigQuery para crear conjuntos de datos.
- Obtén más información sobre las suscripciones a Pub/Sub.
- Explora arquitecturas de referencia, diagramas y prácticas recomendadas sobre Google Cloud. Consulta nuestro Cloud Architecture Center.