Il modello di protocollo Pub/Sub-BigQuery è una pipeline in modalità flusso che importa dati di protocollo da una sottoscrizione Pub/Sub a una tabella BigQuery.
Eventuali errori che si verificano durante la scrittura nella tabella BigQuery vengono inseriti in modalità flusso in un argomento Pub/Sub non elaborato.
Per trasformare i dati, è possibile fornire una funzione definita dall'utente (UDF) JavaScript. Gli errori durante l'esecuzione dell'UDF possono essere inviati a un argomento Pub/Sub separato o allo stesso argomento non elaborato degli errori BigQuery.
Prima di eseguire una pipeline Dataflow per questo scenario, valuta se una sottoscrizione BigQuery di Pub/Sub con una UDF soddisfa i tuoi requisiti.
Requisiti della pipeline
- Deve esistere la sottoscrizione Pub/Sub di input.
- In Cloud Storage deve esistere il file di schema per i record Proto.
- Deve esistere l'argomento Pub/Sub di output.
- Deve esistere il set di dati BigQuery di output.
- Se la tabella BigQuery esiste, deve avere uno schema corrispondente ai dati proto indipendentemente dal valore di
createDisposition
.
Parametri del modello
Parametri obbligatori
- protoSchemaPath (percorso Cloud Storage del file di schema proto): percorso Cloud Storage di un file di set di descrittori autonomo. Esempio: gs://MyBucket/schema.pb.
schema.pb
può essere generato aggiungendo--descriptor_set_out=schema.pb
al comandoprotoc
che compila i proto. Il flag--include_imports
può essere utilizzato per garantire che il file sia autonomo. - fullMessageName (nome completo del messaggio Proto): il nome completo del messaggio (ad esempio package.name.MessageName). Se il messaggio è nidificato all'interno di un altro messaggio, includi tutti i messaggi con il delimitatore "." (ad esempio: package.name.OuterMessage.InnerMessage). "package.name" deve provenire dall'istruzione
package
, non dall'istruzionejava_package
. - inputSubscription (sottoscrizione di input Pub/Sub): sottoscrizione Pub/Sub da cui leggere l'input, nel formato "projects/your-project-id/subscriptions/your-subscription-name" (esempio: projects/your-project-id/subscriptions/your-subscription-name).
- outputTableSpec (tabella di output BigQuery): la posizione della tabella BigQuery in cui scrivere l'output. Il nome deve essere nel formato
<project>:<dataset>.<table_name>
. Lo schema della tabella deve corrispondere agli oggetti di input. - outputTopic (argomento Pub/Sub di output): il nome dell'argomento a cui devono essere pubblicati i dati, nel formato "projects/your-project-id/topics/your-topic-name" (esempio: projects/your-project-id/topics/your-topic-name).
Parametri facoltativi
- preserveProtoFieldNames (Preserva nomi campi proto): flag per controllare se i nomi dei campi proto devono essere mantenuti o convertiti in lowerCamelCase. Se la tabella esiste già, questo valore deve essere basato su ciò che corrisponde allo schema della tabella. In caso contrario, determinerà i nomi delle colonne della tabella creata. True per preservare snake_case proto. False convertirà i campi in lowerCamelCase. (valore predefinito: false).
- bigQueryTableSchemaPath (percorso dello schema della tabella BigQuery): percorso Cloud Storage del file JSON dello schema BigQuery. Se non viene impostato, lo schema viene dedotto dallo schema Proto. (Esempio: gs://MyBucket/bq_schema.json).
- udfOutputTopic (argomento di output Pub/Sub per gli errori UDF): un argomento di output facoltativo a cui inviare gli errori UDF. Se questa opzione non è impostata, gli errori verranno scritti nello stesso argomento degli errori BigQuery. Esempio: projects/your-project-id/topics/your-topic-name.
- writeDisposition (istruzione di scrittura da utilizzare per BigQuery): istruzione di scrittura di BigQuery. Ad esempio, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
- createDisposition (crea disposizione da utilizzare per BigQuery): BigQuery CreateDisposition. Ad esempio, CREATE_IF_NEEDED, CREATE_NEVER. Il valore predefinito è CREATE_IF_NEEDED.
- javascriptTextTransformGcsPath (percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente): il pattern del percorso Cloud Storage per il codice JavaScript contenente le funzioni definite dall'utente. (Esempio: gs://your-bucket/your-function.js).
- javascriptTextTransformFunctionName (nome della funzione JavaScript definita dall'utente): il nome della funzione da chiamare dal file JavaScript. Utilizza solo lettere, cifre e trattini bassi. Esempio: "transform" o "transform_udf1".
- javascriptTextTransformReloadIntervalMinutes (intervallo di ricaricamento automatico della UDF JavaScript (minuti)): definisci l'intervallo in cui i worker possono verificare la presenza di modifiche alla UDF JavaScript per ricaricare i file. Il valore predefinito è 0.
- useStorageWriteApi (Utilizza l'API BigQuery Storage Write): se è true, la pipeline utilizza l'API Storage Write quando scrive i dati in BigQuery (vedi https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). Il valore predefinito è false. Quando utilizzi l'API Storage Write in modalità exactly-once, devi impostare i seguenti parametri: "Numero di stream per l'API BigQuery Storage Write" e "Frequenza di attivazione in secondi per l'API BigQuery Storage Write". Se attivi la modalità Dataflow at-least-once o imposti il parametro useStorageWriteApiAtLeastOnce su true, non devi impostare il numero di stream o la frequenza di attivazione.
- useStorageWriteApiAtLeastOnce (Utilizza la semantica almeno una volta nell'API BigQuery Storage Write): questo parametro ha effetto solo se l'opzione "Utilizza l'API BigQuery Storage Write" è attivata. Se abilitata, la semantica at-least-once verrà utilizzata per l'API Storage Write, altrimenti verrà utilizzata la semantica exactly-once. Il valore predefinito è false.
- numStorageWriteApiStreams (numero di flussi per l'API BigQuery Storage Write): il numero di flussi definisce il parallelismo della trasformazione Write di BigQueryIO e corrisponde approssimativamente al numero di flussi dell'API Storage Write che verranno utilizzati dalla pipeline. Per i valori consigliati, consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api. Il valore predefinito è 0.
- storageWriteApiTriggeringFrequencySec (frequenza di attivazione in secondi per l'API BigQuery Storage Write): la frequenza di attivazione determina la velocità con cui i dati saranno visibili per le query in BigQuery. Per i valori consigliati, consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api.
Funzione definita dall'utente
Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per saperne di più, consulta Creare funzioni definite dall'utente per i modelli Dataflow.
Specifiche della funzione
La UDF ha le seguenti specifiche:
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Pub/Sub Proto to BigQuery template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Sostituisci quanto segue:
JOB_NAME
: un nome univoco del job a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: il percorso Cloud Storage del file schema Proto (ad esempio,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: il nome del messaggio Proto (ad esempiopackage.name.MessageName
)SUBSCRIPTION_NAME
: il nome della sottoscrizione Pub/Sub di inputBIGQUERY_TABLE
: il nome della tabella di output BigQueryUNPROCESSED_TOPIC
: l'argomento Pub/Sub da utilizzare per la coda non elaborata
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: il percorso Cloud Storage del file schema Proto (ad esempio,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: il nome del messaggio Proto (ad esempiopackage.name.MessageName
)SUBSCRIPTION_NAME
: il nome della sottoscrizione Pub/Sub di inputBIGQUERY_TABLE
: il nome della tabella di output BigQueryUNPROCESSED_TOPIC
: l'argomento Pub/Sub da utilizzare per la coda non elaborata
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.