Modello di protocollo Pub/Sub a BigQuery

Il modello di protocollo Pub/Sub-BigQuery è una pipeline in modalità flusso che importa dati di protocollo da una sottoscrizione Pub/Sub a una tabella BigQuery. Eventuali errori che si verificano durante la scrittura nella tabella BigQuery vengono inseriti in modalità flusso in un argomento Pub/Sub non elaborato.

Per trasformare i dati, è possibile fornire una funzione definita dall'utente (UDF) JavaScript. Gli errori durante l'esecuzione dell'UDF possono essere inviati a un argomento Pub/Sub separato o allo stesso argomento non elaborato degli errori BigQuery.

Prima di eseguire una pipeline Dataflow per questo scenario, valuta se una sottoscrizione BigQuery di Pub/Sub con una UDF soddisfa i tuoi requisiti.

Requisiti della pipeline

  • Deve esistere la sottoscrizione Pub/Sub di input.
  • In Cloud Storage deve esistere il file di schema per i record Proto.
  • Deve esistere l'argomento Pub/Sub di output.
  • Deve esistere il set di dati BigQuery di output.
  • Se la tabella BigQuery esiste, deve avere uno schema corrispondente ai dati proto indipendentemente dal valore di createDisposition.

Parametri del modello

Parametri obbligatori

  • protoSchemaPath (percorso Cloud Storage del file di schema proto): percorso Cloud Storage di un file di set di descrittori autonomo. Esempio: gs://MyBucket/schema.pb. schema.pb può essere generato aggiungendo --descriptor_set_out=schema.pb al comando protoc che compila i proto. Il flag --include_imports può essere utilizzato per garantire che il file sia autonomo.
  • fullMessageName (nome completo del messaggio Proto): il nome completo del messaggio (ad esempio package.name.MessageName). Se il messaggio è nidificato all'interno di un altro messaggio, includi tutti i messaggi con il delimitatore "." (ad esempio: package.name.OuterMessage.InnerMessage). "package.name" deve provenire dall'istruzione package, non dall'istruzione java_package.
  • inputSubscription (sottoscrizione di input Pub/Sub): sottoscrizione Pub/Sub da cui leggere l'input, nel formato "projects/your-project-id/subscriptions/your-subscription-name" (esempio: projects/your-project-id/subscriptions/your-subscription-name).
  • outputTableSpec (tabella di output BigQuery): la posizione della tabella BigQuery in cui scrivere l'output. Il nome deve essere nel formato <project>:<dataset>.<table_name>. Lo schema della tabella deve corrispondere agli oggetti di input.
  • outputTopic (argomento Pub/Sub di output): il nome dell'argomento a cui devono essere pubblicati i dati, nel formato "projects/your-project-id/topics/your-topic-name" (esempio: projects/your-project-id/topics/your-topic-name).

Parametri facoltativi

  • preserveProtoFieldNames (Preserva nomi campi proto): flag per controllare se i nomi dei campi proto devono essere mantenuti o convertiti in lowerCamelCase. Se la tabella esiste già, questo valore deve essere basato su ciò che corrisponde allo schema della tabella. In caso contrario, determinerà i nomi delle colonne della tabella creata. True per preservare snake_case proto. False convertirà i campi in lowerCamelCase. (valore predefinito: false).
  • bigQueryTableSchemaPath (percorso dello schema della tabella BigQuery): percorso Cloud Storage del file JSON dello schema BigQuery. Se non viene impostato, lo schema viene dedotto dallo schema Proto. (Esempio: gs://MyBucket/bq_schema.json).
  • udfOutputTopic (argomento di output Pub/Sub per gli errori UDF): un argomento di output facoltativo a cui inviare gli errori UDF. Se questa opzione non è impostata, gli errori verranno scritti nello stesso argomento degli errori BigQuery. Esempio: projects/your-project-id/topics/your-topic-name.
  • writeDisposition (istruzione di scrittura da utilizzare per BigQuery): istruzione di scrittura di BigQuery. Ad esempio, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
  • createDisposition (crea disposizione da utilizzare per BigQuery): BigQuery CreateDisposition. Ad esempio, CREATE_IF_NEEDED, CREATE_NEVER. Il valore predefinito è CREATE_IF_NEEDED.
  • javascriptTextTransformGcsPath (percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente): il pattern del percorso Cloud Storage per il codice JavaScript contenente le funzioni definite dall'utente. (Esempio: gs://your-bucket/your-function.js).
  • javascriptTextTransformFunctionName (nome della funzione JavaScript definita dall'utente): il nome della funzione da chiamare dal file JavaScript. Utilizza solo lettere, cifre e trattini bassi. Esempio: "transform" o "transform_udf1".
  • javascriptTextTransformReloadIntervalMinutes (intervallo di ricaricamento automatico della UDF JavaScript (minuti)): definisci l'intervallo in cui i worker possono verificare la presenza di modifiche alla UDF JavaScript per ricaricare i file. Il valore predefinito è 0.
  • useStorageWriteApi (Utilizza l'API BigQuery Storage Write): se è true, la pipeline utilizza l'API Storage Write quando scrive i dati in BigQuery (vedi https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). Il valore predefinito è false. Quando utilizzi l'API Storage Write in modalità exactly-once, devi impostare i seguenti parametri: "Numero di stream per l'API BigQuery Storage Write" e "Frequenza di attivazione in secondi per l'API BigQuery Storage Write". Se attivi la modalità Dataflow at-least-once o imposti il parametro useStorageWriteApiAtLeastOnce su true, non devi impostare il numero di stream o la frequenza di attivazione.
  • useStorageWriteApiAtLeastOnce (Utilizza la semantica almeno una volta nell'API BigQuery Storage Write): questo parametro ha effetto solo se l'opzione "Utilizza l'API BigQuery Storage Write" è attivata. Se abilitata, la semantica at-least-once verrà utilizzata per l'API Storage Write, altrimenti verrà utilizzata la semantica exactly-once. Il valore predefinito è false.
  • numStorageWriteApiStreams (numero di flussi per l'API BigQuery Storage Write): il numero di flussi definisce il parallelismo della trasformazione Write di BigQueryIO e corrisponde approssimativamente al numero di flussi dell'API Storage Write che verranno utilizzati dalla pipeline. Per i valori consigliati, consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec (frequenza di attivazione in secondi per l'API BigQuery Storage Write): la frequenza di attivazione determina la velocità con cui i dati saranno visibili per le query in BigQuery. Per i valori consigliati, consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per saperne di più, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Specifiche della funzione

La UDF ha le seguenti specifiche:

  • Input: il campo dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: una stringa JSON che corrisponde allo schema della tabella BigQuery di destinazione.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome univoco per il job.
    4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

    5. Dal menu a discesa Modello Dataflow, seleziona the Pub/Sub Proto to BigQuery template.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Sostituisci quanto segue:

    • JOB_NAME: un nome univoco del job a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • SCHEMA_PATH: il percorso Cloud Storage del file schema Proto (ad esempio, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: il nome del messaggio Proto (ad esempio package.name.MessageName)
    • SUBSCRIPTION_NAME: il nome della sottoscrizione Pub/Sub di input
    • BIGQUERY_TABLE: il nome della tabella di output BigQuery
    • UNPROCESSED_TOPIC: l'argomento Pub/Sub da utilizzare per la coda non elaborata

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome univoco del job a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • SCHEMA_PATH: il percorso Cloud Storage del file schema Proto (ad esempio, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: il nome del messaggio Proto (ad esempio package.name.MessageName)
    • SUBSCRIPTION_NAME: il nome della sottoscrizione Pub/Sub di input
    • BIGQUERY_TABLE: il nome della tabella di output BigQuery
    • UNPROCESSED_TOPIC: l'argomento Pub/Sub da utilizzare per la coda non elaborata

    Passaggi successivi