Plantilla de Apache Kafka a la planilla de BigQuery

La plantilla de Apache Kafka a BigQuery es una canalización de transmisión que transfiere datos de texto de clústeres del Servicio administrado de Google Cloud para Apache Kafka y, luego, envía los registros resultantes a las tablas de BigQuery. Cualquier error que ocurra cuando se insertan datos en la tabla de resultados se inserta en una tabla de errores independiente en BigQuery.

También puedes usar la plantilla de Apache Kafka a BigQuery con Kafka externa o autoadministrada.

Requisitos de la canalización

  • El servidor del agente de Apache Kafka debe estar en ejecución y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.
  • Los temas de Apache Kafka deben existir.
  • Debes habilitar las API de Dataflow, BigQuery y Cloud Storage. Si se requiere autenticación, también debes habilitar la API de Secret Manager.
  • Crea un conjunto de datos y una tabla de BigQuery con el esquema adecuado para tu tema de entrada de Kafka. Si usas varios esquemas en el mismo tema y quieres escribir en varias tablas, no necesitas crear la tabla antes de configurar la canalización.
  • Cuando la cola de mensajes no entregados (mensajes no procesados) de la plantilla esté habilitada, crea una tabla vacía que no tenga un esquema para la cola de mensajes no entregados.

Formato del mensaje de Kafka

Esta plantilla admite la lectura de mensajes de Kafka en los siguientes formatos:

Formato JSON

Para leer mensajes JSON, establece el parámetro de plantilla messageFormat en "JSON".

Codificación binaria de Avro

Para leer mensajes binarios de Avro, establece los siguientes parámetros de plantilla:

  • messageFormat: "AVRO_BINARY_ENCODING".
  • binaryAvroSchemaPath: Es la ubicación de un archivo de esquema de Avro en Cloud Storage. Ejemplo: gs://BUCKET_NAME/message-schema.avsc.

Para obtener más información sobre el formato binario de Avro, consulta Codificación binaria en la documentación de Apache Avro.

Avro codificado en el registro de esquemas de Confluent

Para leer mensajes en Avro codificados en el registro de esquemas de Confluent, establece los siguientes parámetros de plantilla:

  • messageFormat: "AVRO_CONFLUENT_WIRE_FORMAT".

  • schemaFormat: Uno de los siguientes valores:
    • "SINGLE_SCHEMA_FILE": El esquema del mensaje se define en un archivo de esquema de Avro. Especifica la ubicación de Cloud Storage del archivo de esquema en el parámetro confluentAvroSchemaPath.
    • "SCHEMA_REGISTRY": Los mensajes se codifican con el registro de esquemas de Confluent. Especifica la URL de la instancia de Confluent Schema Registry en el parámetro schemaRegistryConnectionUrl y el modo de autenticación en el parámetro schemaRegistryAuthenticationMode.

Para obtener más información sobre este formato, consulta Formato de transferencia en la documentación de Confluent.

Autenticación

La plantilla de Apache Kafka a BigQuery admite la autenticación SASL/PLAIN para los agentes de Kafka.

Parámetros de la plantilla

Parámetros obligatorios

  • readBootstrapServerAndTopic: El tema de Kafka desde el que se lee la entrada.
  • writeMode: Escribe registros en una tabla o varias tablas (según el esquema). El modo DYNAMIC_TABLE_NAMES solo es compatible con el formato de mensaje de origen AVRO_CONFLUENT_WIRE_FORMAT y la fuente del esquema SCHEMA_REGISTRY. El nombre de la tabla de destino se genera automáticamente según el nombre del esquema de Avro de cada mensaje. Puede ser un solo esquema (crea una sola tabla) o varios esquemas (crea varias tablas). El modo SINGLE_TABLE_NAME escribe en una sola tabla (esquema único) especificada por el usuario. La configuración predeterminada es SINGLE_TABLE_NAME.
  • kafkaReadAuthenticationMode: Es el modo de autenticación que se usará con el clúster de Kafka. Usa KafkaAuthenticationMethod.NONE para la no autenticación, KafkaAuthenticationMethod.SASL_PLAIN para nombre de usuario y contraseña de SASL/PLAIN, KafkaAuthenticationMethod.SASL_SCRAM_512 para autenticación SASL_SCRAM_512 y KafkaAuthenticationMethod.TLS para autenticación basada en certificados. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS solo se debe usar para el clúster de Google Cloud Apache Kafka para BigQuery, ya que permite la autenticación con credenciales predeterminadas de la aplicación.
  • messageFormat: Es el formato de los mensajes de Kafka que se leerán. Los valores admitidos son AVRO_CONFLUENT_WIRE_FORMAT (Avro codificado del registro de esquemas de Confluent), AVRO_BINARY_ENCODING (Avro binario sin formato) y JSON. La configuración predeterminada es: AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: Si es verdadero, los mensajes con errores se escribirán en BigQuery con información adicional del error. La configuración predeterminada es "false".

Parámetros opcionales

  • outputTableSpec: Ubicación de la tabla de BigQuery en la que se escribirá el resultado. El nombre debe tener el formato <project>:<dataset>.<table_name>. El esquema de la tabla debe coincidir con los objetos de entrada.
  • persistKafkaKey: Si es verdadero, la canalización conservará la clave del mensaje de Kafka en la tabla de BigQuery, en un campo _key de tipo BYTES. El valor predeterminado es false (la clave se ignora).
  • outputProject: Proyecto de salida de BigQuery en el que reside el conjunto de datos. Las tablas se crearán de forma dinámica en el conjunto de datos. La configuración predeterminada es vacía.
  • outputDataset: Es el conjunto de datos de salida de BigQuery en el que se escribirá el resultado. Las tablas se crearán de forma dinámica en el conjunto de datos. Si las tablas se crean con anticipación, los nombres de las tablas deben seguir la convención de nombres especificada. El nombre debe ser bqTableNamePrefix + Avro Schema FullName , cada palabra estará separada por un guion -. La configuración predeterminada es vacía.
  • bqTableNamePrefix: Prefijo de nombre que se usará mientras se crean las tablas de salida de BigQuery. Solo se aplica cuando se usa el registro de esquemas. La configuración predeterminada es vacía.
  • createDisposition: CreateDisposition de BigQuery. Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. La configuración predeterminada es CREATE_IF_NEEDED.
  • writeDisposition: WriteDisposition de BigQuery. Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. La configuración predeterminada es: WRITE_APPEND.
  • useAutoSharding: Si es verdadero, la canalización usa la fragmentación automática cuando se escribe en BigQuery. El valor predeterminado es true.
  • numStorageWriteApiStreams: Especifica la cantidad de transmisiones de escritura. Se debe configurar este parámetro. La ruta predeterminada es 0.
  • storageWriteApiTriggeringFrequencySec: Especifica la frecuencia de activación en segundos. Se debe establecer este parámetro. El tiempo predeterminado es 5 segundos.
  • useStorageWriteApiAtLeastOnce: Este parámetro solo se aplica si “Usar la API de BigQuery Storage Write” está habilitada. Si se habilita, se usará la semántica de “al menos una vez” para la API de Storage Write; de lo contrario, se usará la semántica de “exactamente una vez”. La configuración predeterminada es "false".
  • enableCommitOffsets: Confirma los desplazamientos de los mensajes procesados en Kafka. Si se habilita, esto minimizará las brechas o el procesamiento duplicado de los mensajes cuando se reinicie la canalización. Requiere especificar el ID del grupo de consumidores. La configuración predeterminada es "false".
  • consumerGroupId: Es el identificador único del grupo de consumidores al que pertenece esta canalización. Obligatorio si la confirmación de desplazamientos a Kafka está habilitada. La configuración predeterminada es vacía.
  • kafkaReadOffset: Es el punto de partida para leer mensajes cuando no existen compensaciones confirmadas. El primero comienza desde el principio y el más reciente desde el mensaje más reciente. La configuración predeterminada es: la más reciente.
  • kafkaReadUsernameSecretId: Es el ID del secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka que se usará con la autenticación SASL_PLAIN. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La configuración predeterminada es vacía.
  • kafkaReadPasswordSecretId: Es el ID del secreto de Google Cloud Secret Manager que contiene la contraseña de Kafka que se usará con la autenticación SASL_PLAIN. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La configuración predeterminada es vacía.
  • kafkaReadKeystoreLocation: Es la ruta de acceso de Google Cloud Storage al archivo Java KeyStore (JKS) que contiene el certificado TLS y la clave privada que se usarán para la autenticación con el clúster de Kafka. Por ejemplo, gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation: Es la ruta de acceso de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del agente de Kafka.
  • kafkaReadTruststorePasswordSecretId: Es el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder al archivo Java TrustStore (JKS) para la autenticación TLS de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: Es el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder al archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId: Es el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder a la clave privada dentro del archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramUsernameSecretId: Es el ID del secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka que se usará con la autenticación SASL_SCRAM. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramPasswordSecretId: Es el ID del secreto de Google Cloud Secret Manager que contiene la contraseña de Kafka que se usará con la autenticación SASL_SCRAM. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramTruststoreLocation: Es la ruta de acceso de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del agente de Kafka.
  • kafkaReadSaslScramTruststorePasswordSecretId: Es el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder al archivo Java TrustStore (JKS) para la autenticación SASL_SCRAM de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: El formato del esquema de Kafka. Se puede proporcionar como SINGLE_SCHEMA_FILE o SCHEMA_REGISTRY. Si se especifica SINGLE_SCHEMA_FILE, usa el esquema mencionado en el archivo de esquema Avro para todos los mensajes. Si se especifica SCHEMA_REGISTRY, los mensajes pueden tener un solo esquema o varios. La configuración predeterminada es: SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: Es la ruta de acceso de Google Cloud Storage al único archivo de esquema de Avro que se usa para decodificar todos los mensajes de un tema. La configuración predeterminada es vacía.
  • schemaRegistryConnectionUrl: Es la URL de la instancia del registro de esquemas de Confluent que se usa para administrar esquemas de Avro para la decodificación de mensajes. La configuración predeterminada es vacía.
  • binaryAvroSchemaPath: Es la ruta de acceso de Google Cloud Storage al archivo de esquema de Avro que se usa para decodificar mensajes de Avro con codificación binaria. La configuración predeterminada es vacía.
  • schemaRegistryAuthenticationMode: Es el modo de autenticación del registro de esquemas. Puede ser NONE, TLS o OAUTH. La configuración predeterminada es NONE.
  • schemaRegistryTruststoreLocation: Es la ubicación del certificado SSL en la que se almacena el almacén de confianza para la autenticación en el registro de esquemas. Por ejemplo, /your-bucket/truststore.jks
  • schemaRegistryTruststorePasswordSecretId: Es el SecretId en Secret Manager donde se almacena la contraseña para acceder al secreto en el almacén de confianza. Por ejemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeystoreLocation: Es la ubicación del almacén de claves que contiene el certificado SSL y la clave privada. Por ejemplo, /your-bucket/keystore.jks
  • schemaRegistryKeystorePasswordSecretId: Es el SecretId en Secret Manager donde se encuentra la contraseña para acceder al archivo de almacén de claves. Por ejemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId: Es el SecretId de la contraseña necesaria para acceder a la clave privada del cliente almacenada en el almacén de claves. Por ejemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId: Es el ID de cliente que se usa para autenticar el cliente del registro de esquemas en el modo de OAuth. Se requiere para el formato de mensaje AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: Es el ID del secreto de Google Cloud Secret Manager que contiene el secreto del cliente que se usará para autenticar el cliente del registro de esquemas en modo OAUTH. Se requiere para el formato de mensaje AVRO_CONFLUENT_WIRE_FORMAT. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaRegistryOauthScope: Es el alcance del token de acceso que se usa para autenticar el cliente del registro de esquemas en el modo OAUTH. Este campo es opcional, ya que la solicitud se puede realizar sin que se pase un parámetro de alcance. Por ejemplo, openid
  • schemaRegistryOauthTokenEndpointUrl: Es la URL basada en HTTP(S) del proveedor de identidad de OAuth/OIDC que se usa para autenticar el cliente del registro de esquemas en el modo OAUTH. Se requiere para el formato de mensaje AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: Es el nombre de la tabla de BigQuery completamente calificado para los mensajes con errores. Los mensajes que no llegaron a la tabla de resultados por diferentes motivos (p. ej., un esquema no coincidente, un archivo JSON con formato incorrecto) se escriben en esta tabla, que se creará con la plantilla. Por ejemplo, your-project-id:your-dataset.your-table-name
  • javascriptTextTransformGcsPath: Es el URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName: Es el nombre de la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Especifica la frecuencia en minutos con la que se debe volver a cargar la UDF. Si el valor es mayor que 0, Dataflow comprueba de forma periódica el archivo de UDF en Cloud Storage y vuelve a cargar la UDF si el archivo se modifica. Este parámetro te permite actualizar la UDF mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, se inhabilita la carga de UDF. El valor predeterminado es 0.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

La plantilla solo admite UDF para mensajes de Kafka con formato JSON. Si los mensajes de Kafka usan el formato Avro, no se invoca la UDF.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: El valor del registro de Kafka, serializado como una cadena JSON
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Kafka to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona Al menos una vez.
  8. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto Google Cloud en el que deseas ejecutar el trabajo de Dataflow
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • BOOTSTRAP_SERVER_AND_TOPIC: La dirección y el tema del servidor de arranque de Apache Kafka

    El formato de la dirección y el tema del servidor de arranque depende del tipo de clúster:

    • Clúster de Managed Service for Apache Kafka: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Clúster de Kafka externo: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME: Es el nombre de tu conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de salida de BigQuery.
  • ERROR_TABLE_NAME: Es el nombre de la tabla de BigQuery en la que se escribirán los registros de error.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "messageFormat": "JSON",
          "writeMode": "SINGLE_TABLE_NAME",
          "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME",
          "useBigQueryDLQ": "true",
          "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto Google Cloud en el que deseas ejecutar el trabajo de Dataflow
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • BOOTSTRAP_SERVER_AND_TOPIC: La dirección y el tema del servidor de arranque de Apache Kafka

    El formato de la dirección y el tema del servidor de arranque depende del tipo de clúster:

    • Clúster de Managed Service for Apache Kafka: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Clúster de Kafka externo: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME: Es el nombre de tu conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de salida de BigQuery.
  • ERROR_TABLE_NAME: Es el nombre de la tabla de BigQuery en la que se escribirán los registros de error.

Para obtener más información, consulta Escribe datos de Kafka en BigQuery con Dataflow.

¿Qué sigue?