Scrivere messaggi Pub/Sub Lite utilizzando Apache Spark

Il connettore Spark Pub/Sub Lite è una libreria client Java open source che supporta l'utilizzo di Pub/Sub Lite come origine di input e output per Apache Spark Structured Streaming . Il connettore funziona in tutte le distribuzioni Apache Spark, tra cui Dataproc.

Questa guida rapida illustra come:

  • leggere i messaggi da Pub/Sub Lite
  • scrivere messaggi in Pub/Sub Lite

utilizzando PySpark da un cluster Spark Dataproc.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Configura

  1. Crea variabili per il progetto.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
  2. Creare un bucket Cloud Storage. I nomi dei bucket Cloud Storage devono essere univoci a livello globale.

    export BUCKET=your-bucket-name
    gcloud storage buckets create gs://$BUCKET
    
  3. Crea un argomento e una sottoscrizione Pub/Sub Lite in una località supportata. Consulta Creare un argomento se utilizzi una prenotazione Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
  4. Crea un cluster Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    • --region: una regione Dataproc supportata in cui si trovano l'argomento e l'abbonamento Pub/Sub Lite.
    • --image-version: la versione dell'immagine del cluster, che determina la versione di Apache Spark installata nel cluster. Scegli le versioni di release delle immagini 2.x.x perché il connettore Spark Pub/Sub Lite supporta attualmente Apache Spark 3.x.x.
    • --scopes: abilita l'accesso API ai servizi Google Cloud nello stesso progetto.
    • --enable-component-gateway: abilita l'accesso all'interfaccia utente web di Apache Spark.
    • --bucket: un bucket gestione temporanea Cloud Storage utilizzato per archiviare le dipendenze dei job del cluster, l'output del driver e i file di configurazione del cluster.
  5. Clona il repository della guida rapida e vai alla directory del codice campione:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Scrittura in Pub/Sub Lite

L'esempio seguente:

from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Per inviare il job di scrittura a Dataproc:

Console

  1. Carica lo script PySpark nel bucket Cloud Storage.
    1. Vai alla console Cloud Storage.
    2. Seleziona il bucket.
    3. Utilizza Carica file per caricare lo script PySpark che intendi utilizzare.
  2. Invia il job al cluster Dataproc:
    1. Vai alla console Dataproc.
    2. Vai ai job.
    3. Fai clic su Invia job.
    4. Inserisci i dettagli del lavoro.
    5. In Cluster, scegli il cluster.
    6. In Job (Job), assegna un nome all'ID job.
    7. Per Tipo di job, scegli PySpark.
    8. Per File python principale, fornisci l'URI di archiviazione gcloud dello script PySpark caricato che inizia con gs://.
    9. Per i file JAR, scegli la versione più recente del connettore Spark da Maven, cerca il file JAR con le dipendenze nelle opzioni di download e copia il link.
    10. Per Argomenti, se utilizzi lo script PySpark completo di GitHub, inserisci --project_number=PROJECT_NUMBER,--location=PUBSUBLITE_LOCATION,--topic_id=TOPIC_ID. Se copi lo script PySpark riportato sopra con i to-do completati, lascia vuoto questo campo.
    11. In Proprietà, inserisci la chiave spark.master e il valore yarn.
    12. Fai clic su Invia.

gcloud

Utilizza il comando gcloud dataproc jobs submit pyspark per inviare il job a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: la regione Dataproc preselezionata.
  • --cluster: il nome del cluster Dataproc.
  • --jars: il file jar uber del connettore Spark Pub/Sub Lite con le dipendenze in un bucket Cloud Storage pubblico. Puoi anche visitare questo link per scaricare il file jar uber con le dipendenze da Maven.
  • --driver-log-levels: imposta il livello di logging su INFO a livello di radice.
  • --properties: utilizza il gestore delle risorse YARN per il master Spark.
  • --: fornisci gli argomenti richiesti dallo script.

Se l'operazione writeStream va a buon fine, dovresti visualizzare messaggi di log come quelli riportati di seguito a livello locale e nella pagina dei dettagli del job nella console Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Lettura da Pub/Sub Lite

L'esempio seguente legge i messaggi da un abbonamento Pub/Sub Lite esistente utilizzando l'API readStream. Il connettore emette messaggi conformi allo schema della tabella fisso formattato come spark.sql.Row.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Per inviare il job di lettura a Dataproc:

Console

  1. Carica lo script PySpark nel bucket Cloud Storage.
    1. Vai alla console Cloud Storage.
    2. Seleziona il bucket.
    3. Utilizza Carica file per caricare lo script PySpark che intendi utilizzare.
  2. Invia il job al cluster Dataproc:
    1. Vai alla console Dataproc.
    2. Vai ai job.
    3. Fai clic su Invia job.
    4. Inserisci i dettagli del lavoro.
    5. In Cluster, scegli il cluster.
    6. In Job (Job), assegna un nome all'ID job.
    7. Per Tipo di job, scegli PySpark.
    8. Per File python principale, fornisci l'URI di archiviazione gcloud dello script PySpark caricato che inizia con gs://.
    9. Per i file JAR, scegli la versione più recente del connettore Spark da Maven, cerca il file JAR con le dipendenze nelle opzioni di download e copia il link.
    10. Per Argomenti, se utilizzi lo script PySpark completo di GitHub, inserisci --project_number=PROJECT_NUMBER,--location=PUBSUBLITE_LOCATION,--subscription_id=SUBSCRIPTION_ID. Se copi lo script PySpark riportato sopra con i to-do completati, lascia vuoto questo campo.
    11. In Proprietà, inserisci la chiave spark.master e il valore yarn.
    12. Fai clic su Invia.

gcloud

Utilizza di nuovo il comando gcloud dataproc jobs submit pyspark per inviare il job a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: la regione Dataproc preselezionata.
  • --cluster: il nome del cluster Dataproc.
  • --jars: il file jar uber del connettore Spark Pub/Sub Lite con le dipendenze in un bucket Cloud Storage pubblico. Puoi anche visitare questo link per scaricare il file jar uber con le dipendenze da Maven.
  • --driver-log-levels: imposta il livello di logging su INFO a livello di radice.
  • --properties: utilizza il gestore delle risorse YARN per il master Spark.
  • --: fornisci gli argomenti richiesti per lo script.

Se l'operazione readStream va a buon fine, dovresti visualizzare messaggi di log come quelli riportati di seguito a livello locale e nella pagina dei dettagli del job nella console Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Riproduci ed elimina i messaggi da Pub/Sub Lite

Le operazioni di ricerca non funzionano quando si legge da Pub/Sub Lite utilizzando il connettore Spark Pub/Sub Lite perché i sistemi Apache Spark eseguono il proprio monitoraggio degli offset all'interno delle partizioni. La soluzione alternativa consiste nel svuotare, eseguire ricerche e riavviare i flussi di lavoro.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.

  1. Elimina l'argomento e la sottoscrizione.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Elimina il cluster Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Rimuovi il bucket Cloud Storage.

    gcloud storage rm gs://$BUCKET
    

Passaggi successivi