Plantilla de archivos CSV de Cloud Storage a BigQuery

La canalización de archivos CSV de Cloud Storage a BigQuery es una canalización por lotes que te permite leer datos de archivos CSV almacenados en Cloud Storage y adjuntar el resultado a una tabla de BigQuery. Los archivos CSV se pueden descomprimir o comprimir en formatos que se indican en la página del SDK del enum Compression.

Requisitos de la canalización

Para usar esta plantilla, tu canalización debe cumplir con los siguientes requisitos.

Archivo JSON del esquema de BigQuery

Crea un archivo JSON que describa tu esquema de BigQuery. Asegúrate de que el esquema tenga un array JSON de nivel superior titulado BigQuery Schema y que su contenido siga el patrón {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

La plantilla de archivos CSV de Cloud Storage a BigQuery no admite la importación de datos a los campos STRUCT (Record) en la tabla de BigQuery de destino.

En el siguiente JSON, se describe un esquema de BigQuery de ejemplo:

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Esquema de la tabla de errores

La tabla de BigQuery que almacena los registros rechazados de los archivos CSV debe coincidir con el esquema de tabla definido aquí.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Parámetros de la plantilla

Parámetros obligatorios

  • inputFilePattern: Es la ruta de acceso de Cloud Storage al archivo CSV que contiene el texto que se debe procesar. Por ejemplo, gs://your-bucket/path/*.csv
  • schemaJSONPath: La ruta de Cloud Storage al archivo JSON que define tu esquema de BigQuery.
  • outputTable: Es el nombre de la tabla de BigQuery que almacena tus datos procesados. Si vuelves a usar una tabla de BigQuery existente, los datos se agregan a la tabla de destino.
  • bigQueryLoadingTemporaryDirectory: Es el directorio temporal que se usará durante el proceso de carga de BigQuery. Por ejemplo, gs://your-bucket/your-files/temp_dir
  • badRecordsOutputTable: Es el nombre de la tabla de BigQuery que se usará para almacenar los datos rechazados cuando se procesan los archivos CSV. Si vuelves a usar una tabla de BigQuery existente, los datos se agregan a la tabla de destino. El esquema de esta tabla debe coincidir con el esquema de la tabla de errores (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).
  • delimiter: Es el delimitador de columnas que usa el archivo CSV. Por ejemplo, ,
  • csvFormat: El formato CSV según el formato CSV de Apache Commons. La configuración predeterminada es Default.

Parámetros opcionales

  • containsHeaders: Indica si los encabezados se incluyen en el archivo CSV. La configuración predeterminada es false.
  • csvFileEncoding: Es el formato de codificación de caracteres del archivo CSV. Los valores permitidos son US-ASCII, ISO-8859-1, UTF-8 y UTF-16. La configuración predeterminada es UTF-8.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • PATH_TO_CSV_DATA: la ruta de acceso de Cloud Storage a tus archivos CSV.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • BIGQUERY_DESTINATION_TABLE: el nombre de la tabla de destino de BigQuery.
  • BIGQUERY_BAD_RECORDS_TABLE: el nombre de la tabla de registros incorrectos de BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.
  • DELIMITER: delimitador del archivo CSV
  • CSV_FORMAT: especificación de formato CSV para analizar registros
  • CONTAINS_HEADERS: si los archivos CSV contienen encabezados
  • CSV_FILE_ENCODING: codificación en los archivos CSV

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • PATH_TO_CSV_DATA: la ruta de acceso de Cloud Storage a tus archivos CSV.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • BIGQUERY_DESTINATION_TABLE: el nombre de la tabla de destino de BigQuery.
  • BIGQUERY_BAD_RECORDS_TABLE: el nombre de la tabla de registros incorrectos de BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.
  • DELIMITER: delimitador del archivo CSV
  • CSV_FORMAT: especificación de formato CSV para analizar registros
  • CONTAINS_HEADERS: si los archivos CSV contienen encabezados
  • CSV_FILE_ENCODING: codificación en los archivos CSV

¿Qué sigue?