La plantilla de Pub/Sub Avro a BigQuery es una canalización de streaming que ingiere datos de Avro desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Los errores que se producen al escribir en la tabla de BigQuery se envían a un tema de mensajes no procesados de Pub/Sub.
Antes de ejecutar una canalización de Dataflow para este caso, plantéate si una suscripción de Pub/Sub a BigQuery con una UDF se ajusta a tus necesidades.
Requisitos del flujo de procesamiento
- Se debe crear la suscripción de Pub/Sub de entrada.
- Se debe crear el archivo de esquema de los registros de Avro en Cloud Storage.
- Se debe crear el tema de Pub/Sub sin procesar.
- Se debe crear el conjunto de datos de BigQuery de salida.
Parámetros de plantilla
Parámetros obligatorios
- schemaPath la ubicación en Cloud Storage del archivo de esquema de Avro. Por ejemplo,
gs://path/to/my/schema.avsc
. - inputSubscription la suscripción de entrada de Pub/Sub desde la que se leerán los datos. Por ejemplo,
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>
. - outputTableSpec la ubicación de la tabla de salida de BigQuery en la que se escribirán los resultados. Por ejemplo,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
.En función delcreateDisposition
especificado, la tabla de salida se puede crear automáticamente con el esquema Avro proporcionado por el usuario. - outputTopic el tema de Pub/Sub que se va a usar para los registros sin procesar. Por ejemplo,
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
.
Parámetros opcionales
- useStorageWriteApiAtLeastOnce al usar la API Storage Write, especifica la semántica de escritura. Para usar la semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), asigna el valor "true" a este parámetro. Para usar la semántica de entrega única, asigna el valor
false
al parámetro. Este parámetro solo se aplica cuandouseStorageWriteApi
estrue
. El valor predeterminado esfalse
. - writeDisposition valor de WriteDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por ejemplo,
WRITE_APPEND
,WRITE_EMPTY
oWRITE_TRUNCATE
. El valor predeterminado esWRITE_APPEND
. - createDisposition el valor de CreateDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por ejemplo,
CREATE_IF_NEEDED
yCREATE_NEVER
. El valor predeterminado esCREATE_IF_NEEDED
. - useStorageWriteApi si es true, la canalización usa la API Storage Write de BigQuery (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es
false
. Para obtener más información, consulta el artículo sobre cómo usar la API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams cuando se usa la API Storage Write, especifica el número de flujos de escritura. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debe definir este parámetro. El valor predeterminado es 0. - storageWriteApiTriggeringFrequencySec cuando se usa la API Storage Write, especifica la frecuencia de activación en segundos. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debe definir este parámetro.
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Pub/Sub Avro to BigQuery template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Haz clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Haz los cambios siguientes:
JOB_NAME
: un nombre de trabajo único que elijasREGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: la ruta de Cloud Storage al archivo de esquema de Avro (por ejemplo,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: el nombre de la suscripción de entrada de Pub/SubBIGQUERY_TABLE
: nombre de la tabla de salida de BigQueryDEADLETTER_TOPIC
: el tema de Pub/Sub que se va a usar para la cola sin procesar.
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Haz los cambios siguientes:
JOB_NAME
: un nombre de trabajo único que elijasLOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: la ruta de Cloud Storage al archivo de esquema de Avro (por ejemplo,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: el nombre de la suscripción de entrada de Pub/SubBIGQUERY_TABLE
: nombre de la tabla de salida de BigQueryDEADLETTER_TOPIC
: el tema de Pub/Sub que se va a usar para la cola sin procesar.
Siguientes pasos
- Consulta información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas proporcionadas por Google.