Modello Apache Kafka a Cloud Storage

Il modello da Apache Kafka a Cloud Storage è una pipeline di streaming che importa dati di testo da Google Cloud Managed Service per Apache Kafka e restituisce i record in Cloud Storage.

Puoi anche utilizzare il modello Apache Kafka to BigQuery con Kafka autogestito o esterno.

Requisiti della pipeline

  • Il bucket Cloud Storage di output deve esistere.
  • Il server broker Apache Kafka deve essere in esecuzione e raggiungibile dalle macchine worker Dataflow.
  • Gli argomenti Apache Kafka devono esistere.

Formato messaggi Kafka

Questo modello supporta la lettura dei messaggi da Kafka nei seguenti formati:

Formato JSON

Per leggere i messaggi JSON, imposta il parametro del modello messageFormat su "JSON".

Codifica binaria Avro

Per leggere i messaggi Avro binari, imposta i seguenti parametri del modello:

  • messageFormat: "AVRO_BINARY_ENCODING".
  • binaryAvroSchemaPath: la posizione di un file di schema Avro in Cloud Storage. Esempio: gs://BUCKET_NAME/message-schema.avsc.

Per saperne di più sul formato binario Avro, consulta Codifica binaria nella documentazione di Apache Avro.

Avro codificato nel registro di schema Confluent

Per leggere i messaggi in Avro codificato nel registro degli schemi Confluent, imposta i seguenti parametri del modello:

  • messageFormat: "AVRO_CONFLUENT_WIRE_FORMAT".

  • schemaFormat: uno dei seguenti valori:
    • "SINGLE_SCHEMA_FILE": lo schema del messaggio è definito in un file di schema Avro. Specifica la posizione Cloud Storage del file dello schema nel parametro confluentAvroSchemaPath.
    • "SCHEMA_REGISTRY": i messaggi sono codificati utilizzando Confluent Schema Registry. Specifica l'URL dell'istanza di Confluent Schema Registry nel parametro schemaRegistryConnectionUrl e specifica la modalità di autenticazione nel parametro schemaRegistryAuthenticationMode.

Per saperne di più su questo formato, consulta Wire format nella documentazione di Confluent.

Formato file di output

Il formato del file di output è lo stesso del messaggio Kafka di input. Ad esempio, se selezioni JSON per il formato del messaggio Kafka, i file JSON vengono scritti nel bucket Cloud Storage di output.

Autenticazione

Il modello Apache Kafka a Cloud Storage supporta l'autenticazione SASL/PLAIN ai broker Kafka.

Parametri del modello

Parametri obbligatori

  • readBootstrapServerAndTopic: argomento Kafka da cui leggere l'input.
  • outputDirectory: il percorso e il prefisso del nome file per la scrittura dei file di output. Deve terminare con una barra. Ad esempio, gs://your-bucket/your-path/.
  • kafkaReadAuthenticationMode: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza KafkaAuthenticationMethod.NONE per nessuna autenticazione, KafkaAuthenticationMethod.SASL_PLAIN per nome utente e password SASL/PLAIN, KafkaAuthenticationMethod.SASL_SCRAM_512 per l'autenticazione SASL_SCRAM_512 e KafkaAuthenticationMethod.TLS per l'autenticazione basata su certificato. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS deve essere utilizzato solo per il cluster Google Cloud Apache Kafka per BigQuery, in quanto consente l'autenticazione utilizzando le credenziali predefinite dell'applicazione.
  • messageFormat: il formato dei messaggi Kafka da leggere. I valori supportati sono AVRO_CONFLUENT_WIRE_FORMAT (Avro codificato nel registro degli schemi Confluent), AVRO_BINARY_ENCODING (Avro binario semplice) e JSON. Il valore predefinito è AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: se è true, i messaggi non riusciti verranno scritti in BigQuery con informazioni aggiuntive sugli errori. Il valore predefinito è false.

Parametri facoltativi

  • windowDuration: la durata/dimensione della finestra in cui i dati verranno scritti in Cloud Storage. I formati consentiti sono: Ns (per i secondi, ad esempio 5s), Nm (per i minuti, ad esempio 12m), Nh (per le ore, ad esempio 2h). Ad esempio, 5m. Il valore predefinito è 5 minuti.
  • outputFilenamePrefix: il prefisso da inserire in ogni file in finestra. Ad esempio, output-. Il valore predefinito è output.
  • numShards: il numero massimo di shard di output prodotti durante la scrittura. Un numero maggiore di shard significa una velocità effettiva più elevata per la scrittura in Cloud Storage, ma potenzialmente un costo di aggregazione dei dati più elevato tra gli shard durante l'elaborazione dei file Cloud Storage di output. Il valore predefinito viene deciso da Dataflow.
  • enableCommitOffsets: esegue il commit degli offset dei messaggi elaborati in Kafka. Se attivata, questa opzione riduce al minimo le lacune o l'elaborazione duplicata dei messaggi al riavvio della pipeline. Richiede la specifica dell'ID gruppo di consumer. Il valore predefinito è false.
  • consumerGroupId: l'identificatore univoco del gruppo di consumer a cui appartiene questa pipeline. Obbligatorio se l'opzione Commit Offsets to Kafka è attiva. Il valore predefinito è vuoto.
  • kafkaReadOffset: il punto di partenza per la lettura dei messaggi quando non esistono offset di cui è stato eseguito il commit. Il meno recente è quello iniziale, mentre il più recente viene calcolato partendo dal messaggio più recente. Il valore predefinito è: latest.
  • kafkaReadUsernameSecretId: l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_PLAIN. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaReadPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione SASL_PLAIN. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaReadKeystoreLocation: il percorso Google Cloud Storage del file Java KeyStore (JKS) contenente il certificato TLS e la chiave privata da utilizzare per l'autenticazione con il cluster Kafka. Ad esempio, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: il percorso Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
  • kafkaReadTruststorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere alla chiave privata all'interno del file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramUsernameSecretId: l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_SCRAM. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione SASL_SCRAM. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramTruststoreLocation: il percorso Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
  • kafkaReadSaslScramTruststorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione SASL_SCRAM di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: il formato dello schema Kafka. Può essere fornito come SINGLE_SCHEMA_FILE o SCHEMA_REGISTRY. Se SINGLE_SCHEMA_FILE è specificato, utilizza lo schema menzionato nel file dello schema Avro per tutti i messaggi. Se viene specificato SCHEMA_REGISTRY, i messaggi possono avere uno o più schemi. Il valore predefinito è SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: il percorso Google Cloud Storage al singolo file dello schema Avro utilizzato per decodificare tutti i messaggi in un argomento. Il valore predefinito è vuoto.
  • schemaRegistryConnectionUrl: l'URL dell'istanza di Confluent Schema Registry utilizzata per gestire gli schemi Avro per la decodifica dei messaggi. Il valore predefinito è vuoto.
  • binaryAvroSchemaPath: il percorso Google Cloud Storage al file dello schema Avro utilizzato per decodificare i messaggi Avro con codifica binaria. Il valore predefinito è vuoto.
  • schemaRegistryAuthenticationMode: modalità di autenticazione del registro di schema. Può essere NONE, TLS o OAUTH. Il valore predefinito è NONE.
  • schemaRegistryTruststoreLocation: posizione del certificato SSL in cui è memorizzato l'archivio di attendibilità per l'autenticazione in Schema Registry. Ad esempio, /your-bucket/truststore.jks.
  • schemaRegistryTruststorePasswordSecretId: SecretId in Secret Manager in cui è archiviata la password per accedere al secret nel truststore. Ad esempio, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeystoreLocation: percorso dell'archivio chiavi contenente il certificato SSL e la chiave privata. Ad esempio, /your-bucket/keystore.jks.
  • schemaRegistryKeystorePasswordSecretId: SecretId in Secret Manager in cui si trova la password per accedere al file del keystore. Ad esempio, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId: SecretId della password richiesta per accedere alla chiave privata del cliente archiviata nel keystore. Ad esempio, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId: ID client utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: l'ID secret di Google Cloud Secret Manager che contiene il client secret da utilizzare per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaRegistryOauthScope: l'ambito del token di accesso utilizzato per autenticare il client del registro degli schemi in modalità OAUTH. Questo campo è facoltativo, in quanto la richiesta può essere effettuata senza passare un parametro di ambito. Ad esempio, openid.
  • schemaRegistryOauthTokenEndpointUrl: l'URL basato su HTTP(S) per il provider di identità OAuth/OIDC utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: nome della tabella BigQuery completo per i messaggi non recapitati. I messaggi che non sono stati inseriti nella tabella di output per diversi motivi (ad es. schema non corrispondente, JSON in formato errato) vengono scritti in questa tabella. La tabella verrà creata dal modello. Ad esempio, your-project-id:your-dataset.your-table-name.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the Kafka to Cloud Storage template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \
    --parameters \
readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputDirectory=gs://STORAGE_BUCKET_NAME,\
useBigQueryDLQ=false
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BOOTSTRAP_SERVER_AND_TOPIC: l'indirizzo e l'argomento del server bootstrap Apache Kafka

    Il formato dell'indirizzo e dell'argomento del server bootstrap dipende dal tipo di cluster:

    • Cluster Managed Service per Apache Kafka: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster Kafka esterno: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • STORAGE_BUCKET_NAME: il bucket Cloud Storage in cui viene scritto l'output

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "messageFormat": "JSON",
          "outputDirectory": "gs://STORAGE_BUCKET_NAME",
          "useBigQueryDLQ": "false"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BOOTSTRAP_SERVER_AND_TOPIC: l'indirizzo e l'argomento del server bootstrap Apache Kafka

    Il formato dell'indirizzo e dell'argomento del server bootstrap dipende dal tipo di cluster:

    • Cluster Managed Service per Apache Kafka: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster Kafka esterno: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • STORAGE_BUCKET_NAME: il bucket Cloud Storage in cui viene scritto l'output

Passaggi successivi