Apache Kafka è una piattaforma di streaming distribuita open source per pipeline di dati e integrazione dei dati in tempo reale. Fornisce un sistema di streaming efficiente e scalabile per l'utilizzo in una varietà di applicazioni, tra cui:
- Analisi in tempo reale
- Elaborazione dei flussi
- Aggregazione dei log
- Messaggistica distribuita
- Streaming di eventi
Obiettivi
Installa Kafka su un cluster Dataproc HA con ZooKeeper (in questo tutorial indicato come "cluster Dataproc Kafka").
Crea dati fittizi dei clienti, quindi pubblicali in un argomento Kafka.
Crea tabelle Hive parquet e ORC in Cloud Storage per ricevere i dati degli argomenti Kafka in streaming.
Invia un job PySpark per abbonarti e trasmettere in streaming l'argomento Kafka in Cloud Storage in formato Parquet e ORC.
Esegui una query sui dati della tabella Hive in streaming per conteggiare i messaggi Kafka in streaming.
Costi
In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il calcolatore prezzi.
Al termine delle attività descritte in questo documento, puoi evitare l'addebito di ulteriori costi eliminando le risorse che hai creato. Per ulteriori informazioni, vedi Pulizia.
Prima di iniziare
Se non l'hai ancora fatto, crea un progetto Google Cloud .
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
-
In the Get started section, do the following:
- Enter a globally unique name that meets the bucket naming requirements.
- To add a
bucket label,
expand the Labels section ( ),
click add_box
Add label, and specify a
key
and avalue
for your label.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
-
In the Get started section, do the following:
- Click Create.
Passaggi del tutorial
Esegui i seguenti passaggi per creare un cluster Dataproc Kafka per leggere un argomento Kafka in Cloud Storage in formato Parquet o ORC.
Copia lo script di installazione di Kafka in Cloud Storage
Lo script dell'kafka.sh
azione di inizializzazione
installa Kafka su un cluster Dataproc.
Sfoglia il codice.
Copia lo script
kafka.sh
dell'azione di inizializzazione nel bucket Cloud Storage. Questo script installa Kafka su un cluster Dataproc.Apri Cloud Shell ed esegui questo comando:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
Effettua le seguenti sostituzioni:
- REGION:
kafka.sh
è archiviato in bucket pubblici con tag regionali in Cloud Storage. Specifica una regione di Compute Engine geograficamente vicina (ad esempious-central1
). - BUCKET_NAME: il nome del bucket Cloud Storage.
- REGION:
Crea un cluster Dataproc Kafka
Apri Cloud Shell, quindi esegui il seguente comando
gcloud dataproc clusters create
per creare un cluster HA cluster Dataproc che installa i componenti Kafka e ZooKeeper:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
Note:
- KAFKA_CLUSTER: il nome del cluster, che deve essere univoco all'interno di un progetto. Il nome deve iniziare con una lettera minuscola e può contenere fino a 51 lettere minuscole, numeri e trattini. Non può terminare con un trattino. Il nome di un cluster eliminato può essere riutilizzato.
- PROJECT_ID: il progetto da associare a questo cluster.
- REGION: la regione Compute Engine in cui si troverà il cluster, ad esempio
us-central1
.- Puoi aggiungere il flag facoltativo
--zone=ZONE
per specificare una zona all'interno della regione specificata, ad esempious-central1-a
. Se non specifichi una zona, la funzionalità di posizionamento autozona di Dataproc seleziona una zona con la regione specificata.
- Puoi aggiungere il flag facoltativo
--image-version
: per questo tutorial è consigliata la versione immagine di Dataproc2.1-debian11
. Nota: ogni versione dell'immagine contiene un insieme di componenti preinstallati, incluso il componente Hive utilizzato in questo tutorial (vedi Versioni delle immagini Dataproc supportate).--num-master
: i nodi master3
creano un cluster HA. Il componente Zookeeper, richiesto da Kafka, è preinstallato su un cluster HA.--enable-component-gateway
: attiva il gateway dei componenti Dataproc.- BUCKET_NAME: il nome del bucket Cloud Storage
che contiene lo script di inizializzazione
/scripts/kafka.sh
(vedi Copia lo script di installazione di Kafka in Cloud Storage).
Crea un argomento Kafka custdata
Per creare un argomento Kafka sul cluster Kafka Dataproc:
Utilizza l'utilità SSH per aprire una finestra del terminale sulla VM master del cluster.
Crea un argomento Kafka
custdata
./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
Note:
KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
-w-0:9092
indica il broker Kafka in esecuzione sulla porta9092
sul nodoworker-0
.Dopo aver creato l'argomento
custdata
, puoi eseguire i seguenti comandi:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Pubblica contenuti nell'argomento Kafka custdata
Il seguente script utilizza lo strumento kafka-console-producer.sh
Kafka per
generare dati dei clienti fittizi in formato CSV.
Copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka. Premi <return> per eseguire lo script.
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
Note:
- KAFKA_CLUSTER: il nome del tuo cluster Kafka.
Esegui questo comando Kafka per verificare che l'argomento
custdata
contenga 10.000 messaggi./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
Note:
- KAFKA_CLUSTER: il nome del tuo cluster Kafka.
Output previsto:
custdata:0:10000
Crea tabelle Hive in Cloud Storage
Crea tabelle Hive per ricevere i dati degli argomenti Kafka in streaming.
Esegui i seguenti passaggi per creare tabelle Hive cust_parquet
(parquet) e
cust_orc
(ORC) nel bucket Cloud Storage.
Inserisci il tuo BUCKET_NAME nel seguente script, quindi copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka, poi premi <return> per creare uno script
~/hivetables.hql
(Hive Query Language).Nel passaggio successivo eseguirai lo script
~/hivetables.hql
per creare tabelle Hive parquet e ORC nel bucket Cloud Storage.cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Nel terminale SSH sul nodo master del cluster Kafka, invia il job Hive
~/hivetables.hql
per creare tabelle Hivecust_parquet
(parquet) ecust_orc
(ORC) nel bucket Cloud Storage.gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
Note:
- Il componente Hive è preinstallato sul cluster Dataproc Kafka. Consulta le versioni release 2.1.x per un elenco delle versioni dei componenti Hive incluse nelle immagini 2.1 rilasciate di recente.
- KAFKA_CLUSTER: il nome del tuo cluster Kafka.
- REGION: la regione in cui si trova il cluster Kafka.
Trasmettere in streaming Kafka custdata
alle tabelle Hive
- Esegui questo comando nel terminale SSH sul nodo master del cluster Kafka per installare la libreria
kafka-python
. Per trasmettere i dati degli argomenti Kafka a Cloud Storage è necessario un client Kafka.
pip install kafka-python
Inserisci il tuo BUCKET_NAME, quindi copia e incolla il seguente codice PySpark nel terminale SSH sul nodo master del cluster Kafka e premi <return> per creare un file
streamdata.py
.Lo script si iscrive all'argomento Kafka
custdata
, quindi trasmette i dati alle tabelle Hive in Cloud Storage. Il formato di output, che può essere Parquet o ORC, viene passato allo script come parametro.cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
Nel terminale SSH sul nodo master del cluster Kafka, esegui
spark-submit
per trasmettere i dati in streaming alle tabelle Hive in Cloud Storage.Inserisci il nome del tuo KAFKA_CLUSTER e dell'output FORMAT, quindi copia e incolla il seguente codice nel terminale SSH sul nodo master del cluster Kafka, quindi premi <return> per eseguire il codice e trasmettere in streaming i dati
custdata
di Kafka in formato Parquet alle tue tabelle Hive in Cloud Storage.spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
Note:
- KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
- FORMAT: specifica
parquet
oorc
come formato di output. Puoi eseguire il comando in successione per trasmettere in streaming entrambi i formati alle tabelle Hive: ad esempio, nella prima chiamata, specificaparquet
per trasmettere in streaming l'argomento Kafkacustdata
alla tabella parquet Hive; poi, nella seconda chiamata, specifica il formatoorc
per trasmettere in streamingcustdata
alla tabella Hive ORC.
Dopo che l'output standard si interrompe nel terminale SSH, il che indica che tutto il
custdata
è stato trasmesso in streaming, premi <control-c> nel terminale SSH per interrompere il processo.Elenca le tabelle Hive in Cloud Storage.
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
Note:
- BUCKET_NAME: inserisci il nome del bucket Cloud Storage che contiene le tabelle Hive (vedi Creare tabelle Hive).
Eseguire query sui dati in streaming
Nel terminale SSH sul nodo master del cluster Kafka, esegui il seguente comando
hive
per conteggiare i messaggicustdata
Kafka trasmessi in streaming nelle tabelle Hive in Cloud Storage.hive -e "select count(1) from TABLE_NAME"
Note:
- TABLE_NAME: specifica
cust_parquet
ocust_orc
come nome della tabella Hive.
Snippet di output previsto:
- TABLE_NAME: specifica
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
Esegui la pulizia
Elimina il progetto
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Elimina risorse
-
Elimina il bucket:
gcloud storage buckets delete BUCKET_NAME
- Elimina il cluster Kafka:
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}