Questo documento descrive come integrare Apache Kafka e Pub/Sub Lite utilizzando il connettore Kafka per gruppi Pub/Sub.
Informazioni sul connettore Kafka per gruppi Pub/Sub
Apache Kafka è una piattaforma open source per lo streaming di eventi. È spesso utilizzato nelle architetture distribuite per abilitare la comunicazione tra componenti a basso accoppiamento. Pub/Sub Lite è un servizio gestito per l'invio e la ricezione di messaggi in modo asincrono. Come per Kafka, puoi utilizzare Pub/Sub Lite per comunicare tra i componenti della tua architettura cloud.
Il connettore Kafka per gruppi Pub/Sub ti consente di integrare questi due sistemi. I seguenti connettori sono pacchettizzati nel file JAR del connettore:
- Il connettore sink legge i record da uno o più argomenti Kafka e li pubblica su Pub/Sub Lite.
- Il connettore di origine legge i messaggi da un argomento Pub/Sub Lite e li pubblica in Kafka.
Di seguito sono riportati alcuni scenari in cui potresti utilizzare il connettore Kafka di gruppo Pub/Sub:
- Stai eseguendo la migrazione di un'architettura basata su Kafka a Google Cloud.
- Hai un sistema frontend che archivia gli eventi in Kafka al di fuori di Google Cloud, ma utilizzi anche Google Cloud per eseguire alcuni dei tuoi servizi di backend, che devono ricevere gli eventi Kafka.
- Raccogli i log da una soluzione Kafka on-premise e li invii a Google Cloud per l'analisi dei dati.
- Hai un sistema frontend che utilizza Google Cloud, ma archivi anche i dati on-premise utilizzando Kafka.
Il connettore richiede Kafka Connect, un framework per lo streaming di dati tra Kafka e altri sistemi. Per utilizzare il connettore, devi eseguire Kafka Connect insieme al tuo cluster Kafka.
Questo documento presuppone che tu conosca sia Kafka sia Pub/Sub Lite. Per iniziare a utilizzare Pub/Sub Lite, consulta Pubblicare e ricevere messaggi in Pub/Sub Lite utilizzando la console Google Cloud.
Inizia a utilizzare il connettore Kafka di gruppo Pub/Sub
In questa sezione vengono illustrate le seguenti attività:- Configura il connettore Kafka di gruppo Pub/Sub.
- Invia gli eventi da Kafka a Pub/Sub Lite.
- Invia messaggi da Pub/Sub Lite a Kafka.
Prerequisiti
Installa Kafka
Segui la guida rapida di Apache Kafka per installare un singolo nodo Kafka sulla tua macchina locale. Completa i seguenti passaggi nella guida introduttiva:
- Scarica la release Kafka più recente ed estraila.
- Avvia l'ambiente Kafka.
- Crea un argomento Kafka.
Autentica
Il connettore Kafka per gruppi Pub/Sub deve autenticarsi con Pub/Sub per poter inviare e ricevere messaggi Pub/Sub. Per configurare l'autenticazione, segui questi passaggi:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Scaricare il file JAR del connettore
Scarica il file JAR del connettore nella macchina locale. Per maggiori informazioni, consulta la sezione Acquista il connettore nel file readme di GitHub.
Copia i file di configurazione del connettore
Clona o scarica il repository GitHub per il connettore.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copia i contenuti della directory
config
nella sottodirectoryconfig
della tua installazione di Kafka.cp config/* [path to Kafka installation]/config/
Questi file contengono le impostazioni di configurazione per il connettore.
Aggiorna la configurazione di Kafka Connect
- Vai alla directory contenente il file binario di Kafka Connect che hai scaricato.
- Nella directory dei file binari di Kafka Connect, apri il file
config/connect-standalone.properties
in un editor di testo. - Se
plugin.path property
è commentato, rimuovi il commento. Aggiorna
plugin.path property
in modo da includere il percorso del file JAR del connettore.Esempio:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Imposta la proprietà
offset.storage.file.filename
su un nome file locale. In modalità autonoma, Kafka utilizza questo file per archiviare i dati di offset.Esempio:
offset.storage.file.filename=/tmp/connect.offsets
Inoltra gli eventi da Kafka a Pub/Sub Lite
Questa sezione descrive come avviare il connettore di destinazione, pubblicare gli eventi in Kafka e leggere i messaggi inoltrati da Pub/Sub Lite.
Utilizza Google Cloud CLI per creare una prenotazione Pub/Sub Lite.
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
Sostituisci quanto segue:
- RESERVATION_NAME: il nome della prenotazione Pub/Sub Lite.
- LOCATION: la località della prenotazione.
Utilizza Google Cloud CLI per creare un argomento Pub/Sub Lite con una sottoscrizione.
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
Sostituisci quanto segue:
- LITE_TOPIC: il nome dell'argomento Pub/Sub Lite per ricevere i messaggi da Kafka.
- LOCATION: la posizione dell'argomento. Il valore deve corrispondere alla posizione della prenotazione.
- RESERVATION_NAME: il nome della prenotazione Pub/Sub Lite.
- LITE_SUBSCRIPTION: il nome di un abbonamento Pub/Sub Lite per l'argomento.
Apri il file
/config/pubsub-lite-sink-connector.properties
in un editor di testo. Aggiungi i valori per le seguenti proprietà, contrassegnate da"TODO"
nei commenti:topics=KAFKA_TOPICS pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.topic=LITE_TOPIC
Sostituisci quanto segue:
- KAFKA_TOPICS: un elenco separato da virgole di argomenti Kafka da cui leggere.
- PROJECT_ID: il progetto Google Cloud che contiene il tuo argomento Pub/Sub Lite.
- LOCATION: la posizione dell'argomento Pub/Sub Lite.
- LITE_TOPIC: l'argomento Pub/Sub Lite per ricevere messaggi da Kafka.
Dalla directory Kafka, esegui il seguente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-sink-connector.properties
Segui i passaggi descritti nella guida introduttiva ad Apache Kafka per scrivere alcuni eventi nell'argomento Kafka.
Abbonati alla sottoscrizione Pub/Sub Lite utilizzando uno dei metodi illustrati in Ricezione di messaggi dalle sottoscrizioni Lite.
Inoltra i messaggi da Pub/Sub Lite a Kafka
Questa sezione descrive come avviare il connettore di origine, pubblicare messaggi su Pub/Sub Lite e leggere i messaggi inoltrati da Kafka.
Utilizza Google Cloud CLI per creare una prenotazione Pub/Sub Lite.
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
Sostituisci quanto segue:
- RESERVATION_NAME: il nome della prenotazione Pub/Sub Lite.
- LOCATION: la località della prenotazione.
Utilizza Google Cloud CLI per creare un argomento Pub/Sub Lite con una sottoscrizione.
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
Sostituisci quanto segue:
- LITE_TOPIC: il nome dell'argomento Pub/Sub Lite.
- LOCATION: la posizione dell'argomento. Il valore deve corrispondere alla posizione della prenotazione.
- RESERVATION_NAME: il nome della prenotazione Pub/Sub Lite.
- LITE_SUBSCRIPTION: il nome di un abbonamento Pub/Sub Lite per l'argomento.
Apri il file denominato
/config/pubsub-lite-source-connector.properties
in un editor di testo. Aggiungi i valori per le seguenti proprietà, contrassegnate come"TODO"
nei commenti:topic=KAFKA_TOPIC pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.subscription=LITE_SUBSCRIPTION
Sostituisci quanto segue:
- KAFKA_TOPIC: gli argomenti Kafka per ricevere i messaggi Pub/Sub.
- PROJECT_ID: il progetto Google Cloud che contiene lo argomento Pub/Sub.
- LOCATION: la posizione dell'argomento Pub/Sub Lite.
- LITE_SUBSCRIPTION: l'argomento Pub/Sub Lite.
Dalla directory Kafka, esegui il seguente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-source-connector.properties
Pubblica i messaggi nell'argomento Pub/Sub Lite utilizzando uno dei metodi illustrati in Pubblicazione di messaggi negli argomenti Lite.
Leggi il messaggio da Kafka. Segui i passaggi descritti nella guida introduttiva ad Apache Kafka per leggere i messaggi dall'argomento Kafka.
Conversione dei messaggi
Un record Kafka contiene una chiave e un valore, che sono array di byte di lunghezza variabile. Se vuoi, un record Kafka può avere anche intestazioni, che sono coppie chiave/valore. Un messaggio Pub/Sub Lite ha i seguenti campi:
key
: chiave del messaggio (bytes
)data
: dati del messaggio (bytes
)attributes
: zero o più attributi. Ogni attributo è una mappa(key,values[])
. Un singolo attributo può avere più valori.event_time
: un timestamp dell'evento facoltativo fornito dall'utente.
Kafka Connect utilizza i convertitori per serializzare chiavi e valori verso e da Kafka. Per controllare la serializzazione, imposta le seguenti proprietà nei file di configurazione del connettore:
key.converter
: il convertitore utilizzato per serializzare le chiavi dei record.value.converter
: il convertitore utilizzato per serializzare i valori dei record.
Conversione da Kafka a Pub/Sub Lite
Il connettore sink converte i record Kafka in messaggi Pub/Sub Lite come segue.
Record Kafka
(SinkRecord ) |
Messaggio Pub/Sub Lite |
---|---|
Chiave | key |
Valore | data |
Intestazioni | attributes |
Timestamp | eventTime |
Tipo di timestamp | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |
Argomento | attributes["x-goog-pubsublite-source-kafka-topic"] |
Partition | attributes["x-goog-pubsublite-source-kafka-offset"] |
Offset | attributes["x-goog-pubsublite-source-kafka-partition"] |
Le chiavi, i valori e le intestazioni vengono codificati come segue:
- Gli schemi null vengono trattati come schemi di stringhe.
- I payload in byte vengono scritti direttamente senza conversione.
- I payload di stringhe, numeri interi e a virgola mobile vengono codificati in una sequenza di byte UTF-8.
- Tutti gli altri payload vengono codificati in un tipo Protocol buffer
Value
e poi convertiti in una stringa di byte.- I campi di stringhe nidificati vengono codificati in un
Value
protobuf. - I campi di byte nidificati vengono codificati in un
Value
protobuf contenente i byte con codifica base64. - I campi numerici nidificati vengono codificati come doppi in un
Value
protobuf. - Le mappe con chiavi array, map o struct non sono supportate.
- I campi di stringhe nidificati vengono codificati in un
Conversione da Pub/Sub Lite a Kafka
Il connettore di origine converte i messaggi Pub/Sub Lite in record Kafka come segue:
Messaggio Pub/Sub Lite | Record Kafka
(SourceRecord )
|
---|---|
key |
Chiave |
data |
Valore |
attributes |
Intestazioni |
event_time |
Timestamp. Se event_time non è presente, viene utilizzato il momento della pubblicazione. |
Opzioni di configurazione
Oltre alle configurazioni fornite dall'API Kafka Connect, il connettore supporta le seguenti configurazioni di Pub/Sub Lite.
Opzioni di configurazione del connettore sink
Il connettore di destinazione supporta le seguenti opzioni di configurazione.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java per il connettore. Per il connettore di destinazione Pub/Sub Lite, il valore deve essere com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector .
|
gcp.credentials.file.path |
String |
Facoltativo. Il percorso di un file che memorizza le credenziali Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Facoltativo. Un blob JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite. |
pubsublite.location |
String |
Obbligatorio. La posizione dell'argomento Pub/Sub Lite. |
pubsublite.project |
String |
Obbligatorio. La piattaforma Google Cloud che contiene l'argomento Pub/Sub Lite. |
pubsublite.topic |
String |
Obbligatorio. L'argomento Pub/Sub Lite in cui pubblicare i record Kafka. |
topics |
String |
Obbligatorio. Un elenco separato da virgole di argomenti Kafka da cui leggere. |
Opzioni di configurazione del connettore di origine
Il connettore di origine supporta le seguenti opzioni di configurazione.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java per il connettore. Per il connettore dell'origine Pub/Sub Lite, il valore deve essere com.google.pubsublite.kafka.source.PubSubLiteSourceConnector .
|
gcp.credentials.file.path |
String |
Facoltativo. Il percorso di un file che memorizza le credenziali Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Facoltativo. Un blob JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite. |
kafka.topic |
String |
Obbligatorio. L'argomento Kafka che riceve messaggi da Pub/Sub Lite. |
pubsublite.location |
String |
Obbligatorio. La posizione dell'argomento Pub/Sub Lite. |
pubsublite.partition_flow_control.bytes |
Long |
Il numero massimo di byte in sospeso per partizione Pub/Sub Lite. Valore predefinito: 20.000.000 |
pubsublite.partition_flow_control.messages |
Long |
Il numero massimo di messaggi in attesa per partizione Pub/Sub Lite. Valore predefinito: |
pubsublite.project |
String |
Obbligatorio. Il progetto Google Cloud che contiene l'argomento Pub/Sub Lite. |
pubsublite.subscription |
String |
Obbligatorio. Il nome della sottoscrizione Pub/Sub Lite da cui eseguire il pull dei messaggi. |
Passaggi successivi
- Comprendi le differenze tra Kafka e Pub/Sub.
- Scopri di più sul connettore Kafka di gruppo Pub/Sub.
- Consulta il repository GitHub del connettore Kafka di gruppo Pub/Sub.