Cómo conectar Pub/Sub a Apache Kafka

En este documento, se describe cómo integrar Apache Kafka y Pub/Sub con el conector de Kafka de Pub/Sub Group.

Acerca del conector de Kafka de Pub/Sub Group

Apache Kafka es una plataforma de código abierto para eventos de transmisión. Se usa comúnmente en arquitecturas distribuidas para permitir la comunicación entre componentes con acoplamiento bajo. Pub/Sub es un servicio administrado para enviar y recibir mensajes de forma asíncrona. Al igual que con Kafka, puedes usar Pub/Sub para comunicarte entre los componentes de tu arquitectura de nube.

El conector de Kafka de Pub/Sub Group te permite integrar estos dos sistemas. Los siguientes conectores se empaquetan en el archivo JAR del conector:

  • El conector receptor lee registros de uno o más temas de Kafka y los publica en Pub/Sub.
  • El conector de origen lee mensajes de un tema de Pub/Sub y los publica en Kafka.

A continuación, se incluyen algunos casos en los que podrías usar el conector de Pub/Sub Group Kafka:

  • Estás migrando una arquitectura basada en Kafka a Google Cloud.
  • Tienes un sistema de frontend que almacena eventos en Kafka fuera deGoogle Cloud, pero también usas Google Cloud para ejecutar algunos de tus servicios de backend, que necesitan recibir los eventos de Kafka.
  • Recopilas registros de una solución de Kafka local y los envías aGoogle Cloud para el análisis de datos.
  • Tienes un sistema de frontend que usa Google Cloud, pero también almacenas datos de forma local con Kafka.

El conector requiere Kafka Connect, que es un framework para transmitir datos entre Kafka y otros sistemas. Para usar el conector, debes ejecutar Kafka Connect junto con tu clúster de Kafka.

En este documento, se supone que estás familiarizado con Kafka y Pub/Sub. Antes de leer este documento, te recomendamos que completes una de las guías de inicio rápido de Pub/Sub.

El conector de Pub/Sub no admite ninguna integración entre Google Cloud IAM y las LCA de Kafka Connect.

Comienza a usar el conector

En esta sección, se te guiará por las siguientes tareas:

  1. Configura el conector de Kafka de Pub/Sub Group.
  2. Envía eventos de Kafka a Pub/Sub.
  3. Envía mensajes de Pub/Sub a Kafka.

Requisitos previos

Instala Kafka

Sigue la guía de inicio rápido de Apache Kafka para instalar un nodo único de Kafka en tu máquina local. Completa estos pasos en la guía de inicio rápido:

  1. Descarga la versión más reciente de Kafka y extráela.
  2. Inicia el entorno de Kafka.
  3. Crea un tema de Kafka.

Autenticar

El conector de Kafka de Pub/Sub Group debe autenticarse con Pub/Sub para enviar y recibir mensajes de Pub/Sub. Para configurar la autenticación, sigue estos pasos:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  13. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  14. Descarga el archivo JAR del conector

    Descarga el archivo JAR del conector en tu máquina local. Para obtener más información, consulta Adquiere el conector en el archivo README de GitHub.

    Copia los archivos de configuración del conector

    1. Clona o descarga el repositorio de GitHub del conector.

      git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
      cd java-pubsub-group-kafka-connector
      
    2. Copia el contenido del directorio config en el subdirectorio config de tu instalación de Kafka.

      cp config/* [path to Kafka installation]/config/
      

    Estos archivos contienen parámetros de configuración para el conector.

    Actualiza tu configuración de Kafka Connect

    1. Navega al directorio que contiene el objeto binario de Kafka Connect que descargaste.
    2. En el directorio binario de Kafka Connect, abre el archivo llamado config/connect-standalone.properties en un editor de texto.
    3. Si plugin.path property tiene comentarios, quítalos.
    4. Actualiza plugin.path property para incluir la ruta de acceso al JAR del conector.

      Ejemplo:

      plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
      
    5. Establece la propiedad offset.storage.file.filename en un nombre de archivo local. En el modo independiente, Kafka usa este archivo para almacenar datos de desplazamiento.

      Ejemplo:

      offset.storage.file.filename=/tmp/connect.offsets
      

    Reenvía eventos de Kafka a Pub/Sub

    En esta sección, se describe cómo iniciar el conector de receptor, publicar eventos en Kafka y, luego, leer los mensajes reenviados desde Pub/Sub.

    1. Usa Google Cloud CLI para crear un tema de Pub/Sub con una suscripción.

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      Reemplaza lo siguiente:

      • PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub para recibir los mensajes de Kafka.
      • PUBSUB_SUBSCRIPTION: Es el nombre de una suscripción a Pub/Sub para el tema.
    2. Abre el archivo /config/cps-sink-connector.properties en un editor de texto. Agrega valores para las siguientes propiedades, que están marcadas como "TODO" en los comentarios:

      topics=KAFKA_TOPICS
      cps.project=PROJECT_ID
      cps.topic=PUBSUB_TOPIC

      Reemplaza lo siguiente:

      • KAFKA_TOPICS: Es una lista separada por comas de los temas de Kafka desde los que se debe leer.
      • PROJECT_ID: El Google Cloud proyecto que contiene tu tema de Pub/Sub.
      • PUBSUB_TOPIC: Es el tema de Pub/Sub para recibir los mensajes de Kafka.
    3. Desde el directorio de Kafka, ejecuta el siguiente comando:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-sink-connector.properties
      
    4. Sigue los pasos de la guía de inicio rápido de Apache Kafka para escribir algunos eventos en tu tema de Kafka.

    5. Usa gcloud CLI para leer los eventos de Pub/Sub.

      gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

    Reenvía mensajes de Pub/Sub a Kafka

    En esta sección, se describe cómo iniciar el conector de origen, publicar mensajes en Pub/Sub y leer los mensajes reenviados desde Kafka.

    1. Usa gcloud CLI para crear un tema de Pub/Sub con una suscripción.

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      Reemplaza lo siguiente:

      • PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub.
      • PUBSUB_SUBSCRIPTION: Es el nombre de una suscripción a Pub/Sub.
    2. Abre el archivo llamado /config/cps-source-connector.properties en un editor de texto. Agrega valores para las siguientes propiedades, que están marcadas como "TODO" en los comentarios:

      kafka.topic=KAFKA_TOPIC
      cps.project=PROJECT_ID
      cps.subscription=PUBSUB_SUBSCRIPTION

      Reemplaza lo siguiente:

      • KAFKA_TOPIC: Son los temas de Kafka para recibir los mensajes de Pub/Sub.
      • PROJECT_ID: El Google Cloud proyecto que contiene tu tema de Pub/Sub.
      • PUBSUB_TOPIC: Es el tema de Pub/Sub.
    3. Desde el directorio de Kafka, ejecuta el siguiente comando:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-source-connector.properties
      
    4. Usa gcloud CLI para publicar un mensaje en Pub/Sub.

      gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    5. Lee el mensaje de Kafka. Sigue los pasos de la guía de inicio rápido de Apache Kafka para leer los mensajes del tema de Kafka.

    Conversión de mensajes

    Un registro de Kafka contiene una clave y un valor, que son arreglos de bytes de longitud variable. De manera opcional, un registro de Kafka también puede tener encabezados, que son pares clave-valor. Un mensaje de Pub/Sub tiene dos partes principales: el cuerpo del mensaje y cero o más atributos clave-valor.

    Kafka Connect usa convertidores para serializar claves y valores hacia y desde Kafka. Para controlar la serialización, establece las siguientes propiedades en los archivos de configuración del conector:

    • key.converter: Es el convertidor que se usa para serializar las claves de registro.
    • value.converter: Es el convertidor que se usa para serializar los valores de los registros.

    El cuerpo de un mensaje de Pub/Sub es un objeto ByteString, por lo que la conversión más eficiente es copiar la carga útil directamente. Por ese motivo, recomendamos usar un convertidor que produzca tipos de datos primitivos (esquema de números enteros, números de punto flotante, cadenas o bytes) siempre que sea posible, para evitar la deserialización y la reserialización del mismo cuerpo del mensaje.

    Conversión de Kafka a Pub/Sub

    El conector del receptor convierte los registros de Kafka en mensajes de Pub/Sub de la siguiente manera:

    • La clave del registro de Kafka se almacena como un atributo llamado "key" en el mensaje de Pub/Sub.
    • De forma predeterminada, el conector descarta cualquier encabezado del registro de Kafka. Sin embargo, si configuras la opción de configuración headers.publish como true, el conector escribe los encabezados como atributos de Pub/Sub. El conector omite los encabezados que superan los límites de Pub/Sub en los atributos de los mensajes.
    • Para los esquemas de números enteros, números de punto flotante, cadenas y bytes, el conector pasa los bytes del valor del registro de Kafka directamente al cuerpo del mensaje de Pub/Sub.
    • En el caso de los esquemas de struct, el conector escribe cada campo como un atributo del mensaje de Pub/Sub. Por ejemplo, si el campo es { "id"=123 }, el mensaje de Pub/Sub resultante tiene un atributo "id"="123". El valor del campo siempre se convierte en una cadena. Los tipos de mapa y struct no son compatibles como tipos de campo dentro de un struct.
    • En el caso de los esquemas de mapa, el conector escribe cada par clave-valor como un atributo del mensaje de Pub/Sub. Por ejemplo, si el mapa es {"alice"=1,"bob"=2}, el mensaje de Pub/Sub resultante tiene dos atributos, "alice"="1" y "bob"="2". Las claves y los valores se convierten en cadenas.

    Los esquemas de mapa y struct tienen algunos comportamientos adicionales:

    • De manera opcional, puedes especificar un campo de struct o una clave de mapa en particular para que sea el cuerpo del mensaje. Para ello, establece la propiedad de configuración messageBodyName. El valor del campo o la clave se almacena como un ByteString en el cuerpo del mensaje. Si no configuras messageBodyName, el cuerpo del mensaje estará vacío para los esquemas de mapa y struct.

    • En el caso de los valores de array, el conector solo admite tipos de array primitivos. La secuencia de valores del array se concatena en un solo objeto ByteString.

    Conversión de Pub/Sub a Kafka

    El conector de origen convierte los mensajes de Pub/Sub en registros de Kafka de la siguiente manera:

    • Clave de registro de Kafka: De forma predeterminada, la clave se establece en null. De manera opcional, puedes especificar un atributo de mensaje de Pub/Sub para usarlo como clave. Para ello, configura la opción de configuración kafka.key.attribute. En ese caso, el conector busca un atributo con ese nombre y establece la clave del registro en el valor del atributo. Si el atributo especificado no está presente, la clave del registro se establece en null.

    • Valor del registro de Kafka. El conector escribe el valor del registro de la siguiente manera:

      • Si el mensaje de Pub/Sub no tiene atributos personalizados, el conector escribe el cuerpo del mensaje de Pub/Sub directamente en el valor del registro de Kafka como un tipo byte[], con el convertidor especificado por value.converter.

      • Si el mensaje de Pub/Sub tiene atributos personalizados y kafka.record.headers es false, el conector escribe una struct en el valor del registro. El struct contiene un campo para cada atributo y un campo llamado "message" cuyo valor es el cuerpo del mensaje de Pub/Sub (almacenado como bytes):

        {
          "message": "<Pub/Sub message body>",
          "<attribute-1>": "<value-1>",
          "<attribute-2>": "<value-2>",
          ....
        }
        

        En este caso, debes usar un value.converter que sea compatible con los esquemas de struct, como org.apache.kafka.connect.json.JsonConverter.

      • Si el mensaje de Pub/Sub tiene atributos personalizados y kafka.record.headers es true, el conector escribe los atributos como encabezados de registros de Kafka. Escribe el cuerpo del mensaje de Pub/Sub directamente en el valor del registro de Kafka como un tipo byte[], con el convertidor especificado por value.converter.

    • Encabezados de registros de Kafka. De forma predeterminada, los encabezados están vacíos, a menos que establezcas kafka.record.headers en true.

    Opciones de configuración

    Además de los parámetros de configuración que proporciona la API de Kafka Connect, el conector de Kafka de Pub/Sub Group admite la configuración de receptor y de origen, como se describe en Parámetros de configuración del conector de Pub/Sub.

    Obtén asistencia

    Si necesitas ayuda, crea un ticket de asistencia. Para preguntas y debates generales, crea un problema en el repositorio de GitHub.

    ¿Qué sigue?