La plantilla de Pub/Sub proto a BigQuery es un flujo de procesamiento en streaming que ingiere datos proto desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery.
Los errores que se producen al escribir en la tabla de BigQuery se envían a un tema de Pub/Sub sin procesar.
Se puede proporcionar una función definida por el usuario (UDF) de JavaScript para transformar los datos. Los errores que se produzcan al ejecutar la UDF se pueden enviar a un tema de Pub/Sub independiente o al mismo tema sin procesar que los errores de BigQuery.
Antes de ejecutar una canalización de Dataflow para este caso, plantéate si una suscripción de Pub/Sub a BigQuery con una UDF se ajusta a tus necesidades.
Requisitos del flujo de procesamiento
- Se debe crear la suscripción de Pub/Sub de entrada.
- Se debe crear el archivo de esquema de los registros de Proto en Cloud Storage.
- Se debe crear el tema de Pub/Sub de salida.
- Se debe crear el conjunto de datos de BigQuery de salida.
- Si la tabla de BigQuery existe, debe tener un esquema que coincida con los datos de proto, independientemente del valor de
createDisposition
.
Parámetros de plantilla
Parámetros necesarios
- protoSchemaPath (ruta de Cloud Storage al archivo de esquema Proto): ruta de Cloud Storage a un archivo de conjunto de descriptores independiente. Por ejemplo: gs://MyBucket/schema.pb.
schema.pb
se puede generar añadiendo--descriptor_set_out=schema.pb
al comandoprotoc
que compila los protos. La marca--include_imports
se puede usar para garantizar que el archivo sea independiente. - fullMessageName (nombre completo del mensaje Proto): nombre completo del mensaje (por ejemplo, package.name.MessageName). Si el mensaje está anidado en otro mensaje, incluye todos los mensajes con el delimitador "." (por ejemplo, package.name.OuterMessage.InnerMessage). "package.name" debe proceder de la instrucción
package
, no de la instrucciónjava_package
. - inputSubscription (suscripción de entrada de Pub/Sub): suscripción de Pub/Sub para leer la entrada, con el formato "projects/your-project-id/subscriptions/your-subscription-name" (por ejemplo, projects/your-project-id/subscriptions/your-subscription-name).
- outputTableSpec (tabla de salida de BigQuery): ubicación de la tabla de BigQuery en la que se escribirán los resultados. El nombre debe tener el formato
<project>:<dataset>.<table_name>
. El esquema de la tabla debe coincidir con los objetos de entrada. - outputTopic (tema de Pub/Sub de salida): el nombre del tema en el que se deben publicar los datos, con el formato "projects/your-project-id/topics/your-topic-name" (por ejemplo, projects/your-project-id/topics/your-topic-name).
Parámetros opcionales
- preserveProtoFieldNames (Conservar nombres de campos de proto): marca para controlar si los nombres de los campos de proto se deben conservar o convertir a lowerCamelCase. Si la tabla ya existe, debe basarse en lo que coincida con el esquema de la tabla. De lo contrario, determinará los nombres de las columnas de la tabla creada. True para conservar el formato snake_case de proto. Si es false, los campos se convertirán a lowerCamelCase. Valor predeterminado: false.
- bigQueryTableSchemaPath (Ruta del esquema de la tabla de BigQuery): ruta de Cloud Storage al archivo JSON del esquema de BigQuery. Si no se define, el esquema se deduce del esquema Proto. Por ejemplo, gs://MyBucket/bq_schema.json.
- udfOutputTopic (tema de salida de Pub/Sub para errores de FDU): tema de salida opcional para enviar errores de FDU. Si no se define esta opción, los errores se escribirán en el mismo tema que los errores de BigQuery. Por ejemplo, projects/your-project-id/topics/your-topic-name.
- writeDisposition (Write Disposition to use for BigQuery): WriteDisposition de BigQuery. Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. El valor predeterminado es WRITE_APPEND.
- createDisposition (Create Disposition to use for BigQuery): CreateDisposition de BigQuery. Por ejemplo, CREATE_IF_NEEDED o CREATE_NEVER. El valor predeterminado es CREATE_IF_NEEDED.
- javascriptTextTransformGcsPath (ruta de Cloud Storage al origen de la FDU de JavaScript): patrón de ruta de Cloud Storage del código JavaScript que contiene las funciones definidas por el usuario. Por ejemplo, gs://your-bucket/your-function.js.
- javascriptTextTransformFunctionName (nombre de la función JavaScript UDF): nombre de la función a la que se llamará desde el archivo JavaScript. Usa solo letras, dígitos y guiones bajos. Por ejemplo, "transform" o "transform_udf1".
- javascriptTextTransformReloadIntervalMinutes (intervalo de recarga automática de la UDF de JavaScript [minutos]): define el intervalo en el que los workers pueden comprobar si hay cambios en la UDF de JavaScript para recargar los archivos. El valor predeterminado es 0.
- useStorageWriteApi (Usar la API Storage Write de BigQuery): si es true, la canalización usa la API Storage Write al escribir los datos en BigQuery (consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). El valor predeterminado es false. Cuando se usa la API Storage Write en el modo de entrega exactamente una vez, debe definir los parámetros "Número de flujos de la API Storage Write de BigQuery" y "Frecuencia de activación en segundos de la API Storage Write de BigQuery". Si habilitas el modo al menos una vez de Dataflow o asignas el valor "true" al parámetro useStorageWriteApiAtLeastOnce, no tendrás que definir el número de flujos ni la frecuencia de activación.
- useStorageWriteApiAtLeastOnce (Usar la semántica de al menos una vez en la API Storage Write de BigQuery): este parámetro solo tiene efecto si la opción "Usar la API Storage Write de BigQuery" está habilitada. Si está habilitada, se usará la semántica "al menos una vez" para la API Storage Write. De lo contrario, se usará la semántica "exactamente una vez". Valor predeterminado: false.
- numStorageWriteApiStreams número de flujos de la API Storage Write de BigQuery. El número de flujos define el paralelismo de la transformación Write de BigQueryIO y se corresponde aproximadamente con el número de flujos de la API Storage Write que usará la canalización. Consulta los valores recomendados en https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api. El valor predeterminado es 0.
- storageWriteApiTriggeringFrequencySec (frecuencia de activación en segundos de la API Storage Write de BigQuery): la frecuencia de activación determinará cuándo se podrán consultar los datos en BigQuery. Consulta los valores recomendados en https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api.
Función definida por el usuario
También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.
Especificación de la función
La función definida por el usuario tiene las siguientes especificaciones:
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Pub/Sub Proto to BigQuery template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Haz clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Haz los cambios siguientes:
JOB_NAME
: un nombre de trabajo único que elijasREGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: la ruta de Cloud Storage al archivo de esquema Proto (por ejemplo,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: el nombre del mensaje Proto (por ejemplo,package.name.MessageName
)SUBSCRIPTION_NAME
: el nombre de la suscripción de entrada de Pub/SubBIGQUERY_TABLE
: nombre de la tabla de salida de BigQueryUNPROCESSED_TOPIC
: el tema de Pub/Sub que se va a usar para la cola sin procesar.
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasLOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: la ruta de Cloud Storage al archivo de esquema Proto (por ejemplo,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: el nombre del mensaje Proto (por ejemplo,package.name.MessageName
)SUBSCRIPTION_NAME
: el nombre de la suscripción de entrada de Pub/SubBIGQUERY_TABLE
: nombre de la tabla de salida de BigQueryUNPROCESSED_TOPIC
: el tema de Pub/Sub que se va a usar para la cola sin procesar.
Siguientes pasos
- Consulta información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas proporcionadas por Google.