Modello Java Database Connectivity (JDBC) a BigQuery

Il modello JDBC to BigQuery è una pipeline batch che copia i dati da una tabella di database relazionale in una tabella BigQuery esistente. Questa pipeline utilizza JDBC per connettersi al database relazionale. Utilizza questo modello per copiare i dati da qualsiasi database relazionale con i driver JDBC disponibili in BigQuery.

Per un ulteriore livello di protezione, puoi passare una chiave Cloud KMS, insieme a parametri di stringa di connessione, nome utente e password codificati in Base64 criptati con la chiave Cloud KMS. Per ulteriori dettagli sulla crittografia del nome utente, della password e dei parametri della stringa di connessione, consulta l'endpoint di crittografia dell'API Cloud KMS.

Requisiti della pipeline

  • I driver JDBC per il database relazionale devono essere disponibili.
  • La tabella BigQuery deve esistere prima dell'esecuzione della pipeline.
  • La tabella BigQuery deve avere uno schema compatibile.
  • Il database relazionale deve essere accessibile dalla subnet in cui viene eseguito Dataflow.

Parametri del modello

Parametri obbligatori

  • driverJars: l'elenco separato da virgole dei file JAR del driver. Ad esempio, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
  • driverClassName: il nome della classe del driver JDBC. Ad esempio, com.mysql.jdbc.Driver.
  • connectionURL: la stringa dell'URL di connessione JDBC. Ad esempio, jdbc:mysql://some-host:3306/sampledb. Puoi passare questo valore come stringa criptata con una chiave Cloud KMS e poi codificata in Base64. Rimuovi i caratteri di spaziatura dalla stringa codificata Base64. Tieni presente la differenza tra una stringa di connessione al database Oracle non RAC (jdbc:oracle:thin:@some-host:<port>:<sid>) e una stringa di connessione al database Oracle RAC (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>). Ad esempio, jdbc:mysql://some-host:3306/sampledb.
  • outputTable: la posizione della tabella di output BigQuery. Ad esempio, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • bigQueryLoadingTemporaryDirectory: la directory temporanea per il processo di caricamento di BigQuery. Ad esempio, gs://your-bucket/your-files/temp_dir.

Parametri facoltativi

  • connectionProperties: la stringa di proprietà da utilizzare per la connessione JDBC. Il formato della stringa deve essere [propertyName=property;]*.Per ulteriori informazioni, consulta le proprietà di configurazione (https://dev.mysql.com/doc/connector-j/it/connector-j-reference-configuration-properties.html) nella documentazione di MySQL. Ad esempio: unicode=true;characterEncoding=UTF-8.
  • username: il nome utente da utilizzare per la connessione JDBC. Può essere passato come stringa criptata con una chiave Cloud KMS o come secret di Secret Manager nel formato projects/{project}/secrets/{secret}/versions/{secret_version}.
  • password: la password da utilizzare per la connessione JDBC. Può essere passato come stringa criptata con una chiave Cloud KMS o come secret di Secret Manager nel formato projects/{project}/secrets/{secret}/versions/{secret_version}.
  • query: la query da eseguire sull'origine per estrarre i dati. Tieni presente che alcuni tipi JDBC SQL e BigQuery, pur condividendo lo stesso nome, presentano alcune differenze. Alcune mappature di tipi SQL -> BigQuery importanti da tenere a mente sono DATETIME --> TIMESTAMP. Potrebbe essere necessario il trasferimento di tipo se gli schemi non corrispondono. Ad esempio: select * from sampledb.sample_table.
  • KMSEncryptionKey: la chiave di crittografia Cloud KMS da utilizzare per decriptare il nome utente, la password e la stringa di connessione. Se passi una chiave Cloud KMS, devi anche criptare il nome utente, la password e la stringa di connessione. Ad esempio, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • useColumnAlias: se impostato su true, la pipeline utilizza l'alias della colonna (AS) anziché il nome della colonna per mappare le righe a BigQuery. Il valore predefinito è false.
  • isTruncate: se impostato su true, la pipeline viene troncata prima del caricamento dei dati in BigQuery. Il valore predefinito è false, che fa sì che la pipeline aggiunga i dati.
  • partitionColumn: se viene specificato partitionColumn insieme a table, JdbcIO legge la tabella in parallelo eseguendo più istanze della query sulla stessa tabella (sottoquery) utilizzando gli intervalli. Attualmente supporta le colonne di partizione Long e DateTime. Passa il tipo di colonna tramite partitionColumnType.
  • partitionColumnType: il tipo di partitionColumn, accetta long o datetime. Valore predefinito: long.
  • table: la tabella da leggere quando si utilizzano le partizioni. Questo parametro accetta anche una sottoquery tra parentesi. Ad esempio, (select id, name from Person) as subq.
  • numPartitions: il numero di partizioni. Con i limiti inferiore e superiore, questo valore forma gli intervalli di partizione per le espressioni della clausola WHERE generate che vengono utilizzate per suddividere la colonna della partizione in modo uniforme. Quando l'input è inferiore a 1, il numero viene impostato su 1.
  • lowerBound: il limite inferiore da utilizzare nello schema di partizione. Se non viene fornito, questo valore viene dedotto automaticamente da Apache Beam per i tipi supportati. datetime partitionColumnType accetta il limite inferiore nel formato yyyy-MM-dd HH:mm:ss.SSSZ. Ad esempio: 2024-02-20 07:55:45.000+03:30.
  • upperBound: il limite superiore da utilizzare nello schema di partizione. Se non viene fornito, questo valore viene dedotto automaticamente da Apache Beam per i tipi supportati. datetime partitionColumnType accetta il limite superiore nel formato yyyy-MM-dd HH:mm:ss.SSSZ. Ad esempio: 2024-02-20 07:55:45.000+03:30.
  • fetchSize: il numero di righe da recuperare dal database alla volta. Non utilizzato per le letture partizionate. Il valore predefinito è 50000.
  • createDisposition: il valore CreateDisposition di BigQuery da utilizzare. Ad esempio, CREATE_IF_NEEDED o CREATE_NEVER. Il valore predefinito è CREATE_NEVER.
  • bigQuerySchemaPath: il percorso di Cloud Storage per lo schema JSON di BigQuery. Se createDisposition è impostato su CREATE_IF_NEEDED, questo parametro deve essere specificato. Ad esempio: gs://your-bucket/your-schema.json.
  • outputDeadletterTable: la tabella BigQuery da utilizzare per i messaggi che non sono riusciti a raggiungere la tabella di output, formattata come "PROJECT_ID:DATASET_NAME.TABLE_NAME". Se la tabella non esiste, viene creata quando viene eseguita la pipeline. Se questo parametro non viene specificato, la pipeline non andrà a buon fine in caso di errori di scrittura.Questo parametro può essere specificato solo se useStorageWriteApi o useStorageWriteApiAtLeastOnce è impostato su true.
  • disabledAlgorithms: algoritmi da disattivare separati da virgola. Se questo valore è impostato su none, nessun algoritmo è disattivato. Utilizza questo parametro con cautela, perché gli algoritmi disattivati per impostazione predefinita potrebbero presentare vulnerabilità o problemi di prestazioni. Ad esempio: SSLv3, RC4.
  • extraFilesToStage: percorsi Cloud Storage separati da virgole o secret Secret Manager per i file da eseguire in staging nel worker. Questi file vengono salvati nella directory /extra_files in ogni worker. Ad esempio, gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.
  • useStorageWriteApi: se true, la pipeline utilizza l'API BigQuery Storage di scrittura (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica almeno una volta (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica esattamente una volta, imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the JDBC to BigQuery with BigQuery Storage API support template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Jdbc_to_BigQuery_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       driverJars=DRIVER_JARS,\
       driverClassName=DRIVER_CLASS_NAME,\
       connectionURL=CONNECTION_URL,\
       outputTable=OUTPUT_TABLE,\
       bigQueryLoadingTemporaryDirectory=BIG_QUERY_LOADING_TEMPORARY_DIRECTORY,\

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • DRIVER_JARS: i percorsi Cloud Storage dei driver JDBC separati da virgola
  • DRIVER_CLASS_NAME: il nome della classe del driver JDBC
  • CONNECTION_URL: la stringa dell'URL di connessione JDBC.
  • OUTPUT_TABLE: la tabella di output BigQuery
  • BIG_QUERY_LOADING_TEMPORARY_DIRECTORY: la directory temporanea per il processo di caricamento di BigQuery

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "driverJars": "DRIVER_JARS",
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "CONNECTION_URL",
       "outputTable": "OUTPUT_TABLE",
       "bigQueryLoadingTemporaryDirectory": "BIG_QUERY_LOADING_TEMPORARY_DIRECTORY",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Jdbc_to_BigQuery_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • DRIVER_JARS: i percorsi Cloud Storage dei driver JDBC separati da virgola
  • DRIVER_CLASS_NAME: il nome della classe del driver JDBC
  • CONNECTION_URL: la stringa dell'URL di connessione JDBC.
  • OUTPUT_TABLE: la tabella di output BigQuery
  • BIG_QUERY_LOADING_TEMPORARY_DIRECTORY: la directory temporanea per il processo di caricamento di BigQuery

Passaggi successivi