File CSV di Cloud Storage nel modello BigQuery

La pipeline da file CSV di Cloud Storage a BigQuery è una pipeline batch che consente di leggere i dati dai file CSV archiviati in Cloud Storage e aggiungere il risultato a una tabella BigQuery. I file CSV possono essere decompressi o compressi nei formati elencati nella pagina dell'SDK Enum.Compression

Requisiti della pipeline

Per utilizzare questo modello, la pipeline deve soddisfare i seguenti requisiti.

File JSON dello schema BigQuery

Crea un file JSON che descriva lo schema BigQuery. Assicurati che lo schema abbia un array JSON di primo livello denominato BigQuery Schema e che i suoi contenuti rispettino il pattern {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

Il modello batch Cloud Storage CSV files to BigQuery non supporta l'importazione di dati nei campi STRUCT (Record) nella tabella BigQuery di destinazione.

Il seguente JSON descrive un esempio di schema BigQuery:

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Schema della tabella degli errori

La tabella BigQuery che archivia i record rifiutati dai file CSV deve corrispondere allo schema della tabella definito qui.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Parametri del modello

Parametri obbligatori

  • inputFilePattern: il percorso Cloud Storage del file CSV contenente il testo da elaborare. Ad esempio, gs://your-bucket/path/*.csv.
  • schemaJSONPath: il percorso Cloud Storage del file JSON che definisce lo schema BigQuery.
  • outputTable: il nome della tabella BigQuery che archivia i dati elaborati. Se riutilizzi una tabella BigQuery esistente, i dati vengono aggiunti alla tabella di destinazione.
  • bigQueryLoadingTemporaryDirectory: la directory temporanea da utilizzare durante il processo di caricamento di BigQuery. Ad esempio, gs://your-bucket/your-files/temp_dir.
  • badRecordsOutputTable: il nome della tabella BigQuery da utilizzare per archiviare i dati rifiutati durante l'elaborazione dei file CSV. Se riutilizzi una tabella BigQuery esistente, i dati vengono aggiunti alla tabella di destinazione. Lo schema di questa tabella deve corrispondere allo schema della tabella degli errori (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).
  • Delimitatore: il delimitatore di colonna utilizzato dal file CSV. Ad esempio, ,.
  • csvFormat: il formato CSV in base al formato Apache Commons CSV. Valore predefinito: Default.

Parametri facoltativi

  • containsHeaders: indica se le intestazioni sono incluse nel file CSV. Valore predefinito: false.
  • csvFileEncoding: il formato di codifica dei caratteri del file CSV. I valori consentiti sono US-ASCII, ISO-8859-1, UTF-8 e UTF-16. Il valore predefinito è UTF-8.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • PATH_TO_CSV_DATA: il percorso Cloud Storage dei tuoi file CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON: il percorso Cloud Storage del file JSON contenente la definizione dello schema
  • BIGQUERY_DESTINATION_TABLE: il nome della tabella di destinazione BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: il nome della tabella BigQuery dei record non validi
  • PATH_TO_TEMP_DIR_ON_GCS: il percorso Cloud Storage della directory temporanea
  • DELIMITER: Delimitatore del file CSV
  • CSV_FORMAT: specifica del formato CSV per l'analisi dei record
  • CONTAINS_HEADERS: indica se i file CSV contengono intestazioni
  • CSV_FILE_ENCODING: codifica nei file CSV

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • PATH_TO_CSV_DATA: il percorso Cloud Storage dei tuoi file CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON: il percorso Cloud Storage del file JSON contenente la definizione dello schema
  • BIGQUERY_DESTINATION_TABLE: il nome della tabella di destinazione BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: il nome della tabella BigQuery dei record non validi
  • PATH_TO_TEMP_DIR_ON_GCS: il percorso Cloud Storage della directory temporanea
  • DELIMITER: Delimitatore del file CSV
  • CSV_FORMAT: specifica del formato CSV per l'analisi dei record
  • CONTAINS_HEADERS: indica se i file CSV contengono intestazioni
  • CSV_FILE_ENCODING: codifica nei file CSV

Passaggi successivi