O modelo do Apache Kafka para BigQuery é um pipeline de streaming que ingere dados de texto dos clusters do Google Cloud Managed Service para Apache Kafka e envia os registros resultantes para as tabelas do BigQuery. Todos os erros que ocorrem ao inserir dados na tabela de saída são inseridos em uma tabela de erros separada no BigQuery.
Também é possível usar o modelo Apache Kafka para BigQuery com o Kafka autogerenciado ou externo.
Requisitos de pipeline
- O servidor do agente do Apache Kafka precisa estar em execução e acessível em máquinas de trabalho do Dataflow.
- Os tópicos do Apache Kafka precisam existir.
- Ative as APIs Dataflow, BigQuery e Cloud Storage. Se a autenticação é necessária, você também precisa ativar a API Secret Manager.
- Criar um conjunto de dados e uma tabela do BigQuery com o esquema apropriado para seu tópico de entrada do Kafka Se você estiver usando vários esquemas no mesmo tópico e quiser gravar em várias tabelas, não será necessário criar a tabela antes de configurar o pipeline.
- Quando a fila de mensagens inativas (mensagens não processadas) do modelo estiver ativada, crie uma tabela vazia que não tenha um esquema para a fila.
Formato de mensagem do Kafka
Esse modelo oferece suporte à leitura de mensagens do Kafka nos seguintes formatos:
Formato JSON
Para ler mensagens JSON, defina o parâmetro de modelo messageFormat
como
"JSON"
.
Codificação binária Avro
Para ler mensagens Avro binárias, defina os seguintes parâmetros de modelo:
messageFormat
:"AVRO_BINARY_ENCODING"
.binaryAvroSchemaPath
: o local de um arquivo de esquema Avro no Cloud Storage. Exemplo:gs://BUCKET_NAME/message-schema.avsc
.
Para mais informações sobre o formato binário Avro, consulte Codificação binária na documentação do Apache Avro.
Avro codificado pelo Confluent Schema Registry
Para ler mensagens em Avro codificado pelo Confluent Schema Registry, defina os seguintes parâmetros de modelo:
messageFormat
:"AVRO_CONFLUENT_WIRE_FORMAT"
.schemaFormat
: um dos seguintes valores:"SINGLE_SCHEMA_FILE"
: o esquema da mensagem é definido em um arquivo de esquema Avro. Especifique o local do Cloud Storage do arquivo de esquema no parâmetroconfluentAvroSchemaPath
.-
"SCHEMA_REGISTRY"
: as mensagens são codificadas usando o Confluent Schema Registry. Especifique o URL da instância do Confluent Schema Registry no parâmetroschemaRegistryConnectionUrl
e o modo de autenticação no parâmetroschemaRegistryAuthenticationMode
.
Para mais informações sobre esse formato, consulte Formato de transmissão na documentação da Confluent.
Autenticação
O modelo Apache Kafka para BigQuery oferece suporte à autenticação SASL/PLAIN para agentes Kafka.
Parâmetros do modelo
Parâmetros obrigatórios
- readBootstrapServerAndTopic: o tópico do Kafka a ser usado para ler a entrada.
- writeMode: grava registros em uma ou várias tabelas (com base no esquema). O modo
DYNAMIC_TABLE_NAMES
é compatível apenas com o formato de mensagem de origemAVRO_CONFLUENT_WIRE_FORMAT
e a origem do esquemaSCHEMA_REGISTRY
. O nome da tabela de destino é gerado automaticamente com base no nome do esquema Avro de cada mensagem. Ele pode ser um único esquema (criando uma única tabela) ou vários esquemas (criando várias tabelas). O modoSINGLE_TABLE_NAME
grava em uma única tabela (esquema único) especificada pelo usuário. O padrão éSINGLE_TABLE_NAME
. - kafkaReadAuthenticationMode: o modo de autenticação a ser usado com o cluster do Kafka. Use
KafkaAuthenticationMethod.NONE
para nenhuma autenticação,KafkaAuthenticationMethod.SASL_PLAIN
para nome de usuário e senha SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512
para autenticação SASL_SCRAM_512 eKafkaAuthenticationMethod.TLS
para autenticação com base em certificado.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
só deve ser usado para o cluster do Google Cloud Apache Kafka para BigQuery. Ele permite a autenticação usando credenciais padrão do aplicativo. - messageFormat: o formato das mensagens do Kafka a serem lidas. Os valores aceitos são
AVRO_CONFLUENT_WIRE_FORMAT
(Avro codificado do Confluent Schema Registry),AVRO_BINARY_ENCODING
(Avro binário simples) eJSON
. O padrão é: AVRO_CONFLUENT_WIRE_FORMAT. - useBigQueryDLQ: se verdadeiro, as mensagens com falha serão gravadas no BigQuery com informações extras sobre o erro. O padrão é: falso.
Parâmetros opcionais
- outputTableSpec: o local da tabela do BigQuery em que a saída será gravada. O nome precisa estar no formato
<project>:<dataset>.<table_name>
. O esquema da tabela precisa corresponder aos objetos de entrada. - persistKafkaKey: se verdadeiro, o pipeline vai manter a chave da mensagem do Kafka na tabela do BigQuery, em um campo
_key
do tipoBYTES
. O padrão éfalse
(a chave é ignorada). - outputProject: projeto de saída do BigQuery em que o conjunto de dados reside. As tabelas serão criadas dinamicamente no conjunto de dados. O padrão é vazio.
- outputDataset: conjunto de dados de saída do BigQuery para gravar a saída. As tabelas serão criadas dinamicamente no conjunto de dados. Se as tabelas forem criadas de antemão, os nomes das tabelas devem seguir a convenção de nomenclatura especificada. O nome precisa ser
bqTableNamePrefix + Avro Schema FullName
, e cada palavra será separada por um hífen-
. O padrão é vazio. - bqTableNamePrefix: prefixo de nomenclatura a ser usado na criação de tabelas de saída do BigQuery. Aplicável apenas ao usar o registro de esquemas. O padrão é vazio.
- createDisposition: CreateDisposition do BigQuery. Por exemplo:
CREATE_IF_NEEDED
,CREATE_NEVER
. O valor padrão é: CREATE_IF_NEEDED. - writeDisposition: WriteDisposition do BigQuery. Por exemplo,
WRITE_APPEND
,WRITE_EMPTY
ouWRITE_TRUNCATE
. O valor padrão é: WRITE_APPEND. - useAutoSharding: se verdadeiro, o pipeline usa a fragmentação automática ao gravar no BigQuery. O valor padrão é
true
. - numStorageWriteApiStreams: especifica o número de streams de gravação. Esse parâmetro precisa ser definido. O padrão é
0
. - storageWriteApiTriggeringFrequencySec: especifica a frequência de acionamento em segundos. Esse parâmetro precisa ser definido. O padrão é 5 segundos.
- useStorageWriteApiAtLeastOnce: esse parâmetro só entra em vigor se a opção "Usar a API BigQuery Storage Write" está ativada. Se ativada, a semântica do tipo "pelo menos uma vez" será usada para a API Storage Write. Caso contrário, a semântica "exatamente uma" será usada. O padrão é: falso.
- enableCommitOffsets: deslocamentos de confirmação de mensagens processadas para o Kafka. Se ativado, isso minimizará as lacunas ou o processamento duplicado de mensagens ao reiniciar o pipeline. Exige especificar o ID do grupo de consumidores. O padrão é: falso.
- consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esse pipeline pertence. Obrigatório se os deslocamentos de confirmação para Kafka estiverem ativados. O padrão é vazio.
- kafkaReadOffset: o ponto de partida para ler mensagens quando não há deslocamentos confirmados. O mais antigo começa no início, o mais recente a partir da mensagem mais recente. O padrão é: mais recente.
- kafkaReadUsernameSecretId: o ID do secret do Google Cloud Secret Manager que contém o nome de usuário do Kafka a ser usado com a autenticação
SASL_PLAIN
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. O padrão é vazio. - kafkaReadPasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha do Kafka a ser usada com a autenticação
SASL_PLAIN
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. O padrão é vazio. - kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada a serem usados na autenticação com o cluster do Kafka. Por exemplo,
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka.
- kafkaReadTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramUsernameSecretId: o ID do secret do Google Cloud Secret Manager que contém o nome de usuário do Kafka a ser usado com a autenticação
SASL_SCRAM
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramPasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha do Kafka a ser usada com a autenticação
SASL_SCRAM
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação SASL_SCRAM do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaFormat: o formato de esquema do Kafka. Pode ser fornecido como
SINGLE_SCHEMA_FILE
ouSCHEMA_REGISTRY
. SeSINGLE_SCHEMA_FILE
for especificado, use o esquema mencionado no arquivo de esquema Avro para todas as mensagens. SeSCHEMA_REGISTRY
for especificado, as mensagens poderão ter um único esquema ou vários esquemas. O padrão é SINGLE_SCHEMA_FILE. - confluentAvroSchemaPath: o caminho do Google Cloud Storage para o único arquivo de esquema Avro usado para decodificar todas as mensagens em um tópico. O padrão é vazio.
- schemaRegistryConnectionUrl: o URL da instância do Confluent Schema Registry usada para gerenciar esquemas Avro para decodificação de mensagens. O padrão é vazio.
- binaryAvroSchemaPath: o caminho do Google Cloud Storage para o arquivo de esquema do Avro usado para decodificar mensagens Avro codificadas em binário. O padrão é vazio.
- schemaRegistryAuthenticationMode: modo de autenticação do registro de esquema. Pode ser NONE, TLS ou OAUTH. O padrão é: NENHUM.
- schemaRegistryTruststoreLocation: local do certificado SSL em que o Truststore para autenticação no Schema Registry está armazenado. Por exemplo,
/your-bucket/truststore.jks
. - schemaRegistryTruststorePasswordSecretId: SecretId no Secret Manager em que a senha para acessar o secret no truststore é armazenada. Por exemplo,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeystoreLocation: local do keystore que contém o certificado SSL e a chave privada. Por exemplo,
/your-bucket/keystore.jks
. - schemaRegistryKeystorePasswordSecretId: SecretId no Secret Manager em que a senha para acessar o arquivo do keystore. Por exemplo,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeyPasswordSecretId: SecretId da senha necessária para acessar a chave privada do cliente armazenada no keystore. Por exemplo,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryOauthClientId: ID do cliente usado para autenticar o cliente do registro de esquema no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT.
- schemaRegistryOauthClientSecretId: o ID do secret do Secret Manager do Google Cloud que contém a chave secreta do cliente a ser usada para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaRegistryOauthScope: o escopo do token de acesso usado para autenticar o cliente do Schema Registry no modo OAUTH. Esse campo é opcional, já que a solicitação pode ser feita sem um parâmetro de escopo transmitido. Por exemplo,
openid
. - schemaRegistryOauthTokenEndpointUrl: o URL baseado em HTTP(S) do provedor de identidade OAuth/OIDC usado para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT.
- outputDeadletterTable: nome totalmente qualificado da tabela do BigQuery para mensagens com falha. As mensagens não chegam à tabela de saída por diferentes motivos (por exemplo, esquema incompatível, json incorreto) são gravadas nesta tabela, que é criada pelo modelo. Por exemplo,
your-project-id:your-dataset.your-table-name
. - javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: o nome da função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: especifica a frequência de recarregamento da UDF em minutos. Se o valor for maior que 0, o Dataflow vai verificar periodicamente o arquivo da UDF no Cloud Storage e vai atualizar a UDF se o arquivo for modificado. Com esse parâmetro, é possível atualizar a UDF enquanto o pipeline está em execução, sem precisar reiniciar o job. Se o valor for
0
, o recarregamento da UDF será desativado. O valor padrão é0
.
Função definida pelo usuário
Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.
O modelo oferece suporte a UDFs apenas para mensagens do Kafka formatadas em JSON. Se as mensagens do Kafka usarem o formato Avro, a UDF não será invocada.Especificação da função
A UDF tem a seguinte especificação:
- Entrada: o valor de registro Kafka, serializado como uma string JSON.
- Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Kafka to BigQuery template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ writeMode=SINGLE_TABLE_NAME,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Substitua:
PROJECT_ID
: o ID do projeto Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de inicialização e o tópico do Apache Kafka.O formato do endereço do servidor de inicialização e do tópico depende do tipo de cluster:
- Cluster do Serviço gerenciado para Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster externo do Kafka:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster do Serviço gerenciado para Apache Kafka:
DATASET_NAME
: o nome do seu conjunto de dados do BigQueryTABLE_NAME
: o nome da tabela de saída do BigQuery.ERROR_TABLE_NAME
: o nome da tabela do BigQuery em que os registros de erro serão gravados.
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "writeMode": "SINGLE_TABLE_NAME", "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME", "useBigQueryDLQ": "true", "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex", } }
Substitua:
PROJECT_ID
: o ID do projeto Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaLOCATION
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de inicialização e o tópico do Apache Kafka.O formato do endereço do servidor de inicialização e do tópico depende do tipo de cluster:
- Cluster do Serviço gerenciado para Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster externo do Kafka:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster do Serviço gerenciado para Apache Kafka:
DATASET_NAME
: o nome do seu conjunto de dados do BigQueryTABLE_NAME
: o nome da tabela de saída do BigQuery.ERROR_TABLE_NAME
: o nome da tabela do BigQuery em que os registros de erro serão gravados.
Para mais informações, consulte Gravar dados do Kafka no BigQuery com o Dataflow.
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.