Sviluppare notebook Apache Beam con il runner interattivo

Utilizza il runner interattivo Apache Beam con i notebook JupyterLab per completare le seguenti attività:

  • Sviluppare le pipeline in modo iterativo.
  • Controlla il grafico della pipeline.
  • Analizza i singoli PCollections in un flusso di lavoro Read–Eval–Print Loop (REPL).

Questi notebook Apache Beam sono resi disponibili tramite notebook gestiti dall'utente di Vertex AI Workbench, un servizio che ospita macchine virtuali di notebook preinstallate con i più recenti framework di data science e machine learning. Dataflow supporta solo le istanze dei blocchi note gestite dall'utente.

Questa guida si concentra sulle funzionalità introdotte dai notebook Apache Beam, ma non mostra come crearne uno. Per ulteriori informazioni su Apache Beam, consulta la guida alla programmazione di Apache Beam.

Supporto e limitazioni

  • I notebook Apache Beam supportano solo Python.
  • I segmenti della pipeline Apache Beam in esecuzione in questi notebook vengono eseguiti in un ambiente di test e non in un runner Apache Beam di produzione. Per avviare i notebook sul servizio Dataflow, esporta le pipeline create nel notebook Apache Beam. Per maggiori dettagli, consulta Avvia job Dataflow da una pipeline creata nel notebook.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

Prima di creare l'istanza del notebook Apache Beam, attiva API aggiuntive per le pipeline che utilizzano altri servizi, come Pub/Sub.

Se non specificato, l'istanza del notebook viene eseguita dall'account di servizio Compute Engine predefinito con il ruolo Editor del progetto IAM. Se il progetto limita esplicitamente i ruoli dell'account di servizio, assicurati che disponga ancora di autorizzazioni sufficienti per eseguire i notebook. Ad esempio, la lettura da un argomento Pub/Sub crea implicitamente una sottoscrizione e il tuo account di servizio ha bisogno di un ruolo Editor Pub/Sub IAM. Al contrario, la lettura da una sottoscrizione Pub/Sub richiede solo un ruolo IAM Pub/Sub Subscriber.

Al termine di questa guida, per evitare che la fatturazione continui, elimina le risorse che hai creato. Per ulteriori dettagli, consulta la sezione Pulizia.

Avvia un'istanza di notebook Apache Beam

  1. Nella console Google Cloud, vai alla pagina Workbench di Dataflow.

    Vai a Workbench

  2. Assicurati di essere nella scheda Blocchi note gestiti dall'utente.

  3. Nella barra degli strumenti, fai clic su Crea nuova.

  4. Nella sezione Ambiente, seleziona Apache Beam per Ambiente.

  5. (Facoltativo) Se vuoi eseguire i notebook su una GPU, nella sezione Tipo di macchina, seleziona un tipo di macchina che supporti le GPU e poi Installa automaticamente il driver GPU NVIDIA. Per ulteriori informazioni, consulta Piattaforme GPU.

  6. Nella sezione Networking, seleziona una subnet per la VM notebook.

  7. (Facoltativo) Se vuoi configurare un'istanza di blocchi note personalizzata, consulta Creare un'istanza di blocchi note gestita dall'utente con proprietà specifiche.

  8. Fai clic su Crea. Dataflow Workbench crea una nuova istanza del notebook Apache Beam.

  9. Dopo aver creato l'istanza di blocco note, il link Apri JupyterLab diventa attivo. Fai clic su Apri JupyterLab.

(Facoltativo) Installa le dipendenze

I notebook Apache Beam sono già dotati di Apache Beam e delle dipendenze dei connettori Google Cloud. Se la pipeline contiene connettori personalizzati o PTransforms personalizzati che dipendono da librerie di terze parti, installali dopo aver creato un'istanza del notebook. Per ulteriori informazioni, consulta la sezione Installare le dipendenze nella documentazione dei notebook gestiti dall'utente.

Blocchi note Apache Beam di esempio

Dopo aver creato un'istanza di blocchi note gestiti dall'utente, apri JupyterLab. Nella scheda File della barra laterale di JupyterLab, la cartella Esempi contiene i blocchi note di esempio. Per ulteriori informazioni sull'utilizzo dei file JupyterLab, consulta Utilizzare i file nella guida utente di JupyterLab.

Sono disponibili i seguenti notebook:

  • Conteggio parole
  • Conteggio parole in streaming
  • Streaming dei dati sulle corse in taxi a New York
  • Apache Beam SQL nei notebook con confronti con le pipeline
  • Apache Beam SQL nei notebook con il runner Dataflow
  • Apache Beam SQL nei notebook
  • Conteggio parole Dataflow
  • Flink interattivo su larga scala
  • RunInference
  • Utilizzare le GPU con Apache Beam
  • Visualizzare i dati

La cartella Tutorial contiene tutorial aggiuntivi che spiegano i fondamenti di Apache Beam. Sono disponibili i seguenti tutorial:

  • Operazioni di base
  • Operazioni element wise
  • Aggregazioni
  • Windows
  • Operazioni I/O
  • Streaming
  • Esercizi finali

Questi notebook includono testo esplicativo e blocchi di codice commentati per aiutarti a comprendere i concetti e l'utilizzo dell'API Apache Beam. I tutorial forniscono anche esercizi per mettere in pratica i concetti.

Le sezioni seguenti utilizzano il codice di esempio del blocco note Conteggio parole in streaming. Gli snippet di codice in questa guida e quelli trovati nel notebook Conteggio parole in streaming potrebbero presentare piccole discrepanze.

Crea un'istanza di notebook

Vai a File > Nuovo > Notebook e seleziona un kernel Apache Beam 2.22 o versioni successive.

I notebook Apache Beam vengono compilati in base al ramo master dell'SDK Apache Beam. Ciò significa che la versione più recente del kernel mostrata nell'interfaccia utente dei notebook potrebbe essere precedente alla versione dell'SDK rilasciata più di recente.

Apache Beam è installato nell'istanza notebook, quindi includi i moduli interactive_runner e interactive_beam nel notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Se il notebook utilizza altre API di Google, aggiungi le seguenti istruzioni di importazione:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Impostare le opzioni di interattività

La riga seguente imposta il periodo di tempo durante il quale InteractiveRunner registra i dati da un'origine illimitata. In questo esempio, la durata è impostata su 10 minuti.

ib.options.recording_duration = '10m'

Puoi anche modificare il limite di dimensione della registrazione (in byte) per un'origine illimitata utilizzando la proprietà recording_size_limit.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

Per altre opzioni interattive, consulta la classe interactive_beam.options.

Crea la tua pipeline

Inizializza la pipeline utilizzando un oggetto InteractiveRunner.

options = pipeline_options.PipelineOptions(flags={})

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Set the project to the default project in your current Google Cloud environment.
# The project is used to create a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

p = beam.Pipeline(InteractiveRunner(), options=options)

Leggere e visualizzare i dati

L'esempio seguente mostra una pipeline Apache Beam che crea una sottoscrizione all'argomento Pub/Sub specificato e legge i dati dalla sottoscrizione.

words = p | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

La pipeline conteggia le parole per finestre dall'origine. Crea finestre predefinite con una durata di 10 secondi ciascuna.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Dopo aver suddiviso i dati in finestre, le parole vengono conteggiate per finestra.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Il metodo show() visualizza la PCollection risultante nel notebook.

ib.show(windowed_word_counts, include_window_info=True)

Il metodo show che visualizza una PCollection in formato tabulare.

Puoi restringere l'ambito del set di risultati da show() impostando due parametri facoltativi: n e duration.

  • Imposta n per limitare il set di risultati in modo da mostrare al massimo n elementi, ad esempio 20. Se n non è impostato, il comportamento predefinito è elencare gli elementi acquisiti più recenti fino al termine della registrazione dell'origine.
  • Imposta duration per limitare il set di risultati a un numero specificato di secondi di dati a partire dall'inizio della registrazione dell'origine. Se duration non è impostato, il comportamento predefinito è elencare tutti gli elementi fino al termine della registrazione.

Se sono impostati entrambi i parametri facoltativi, show() si interrompe ogni volta che viene raggiunta una delle soglie. Nell'esempio seguente, show() restituisce al massimo 20 elementi calcolati in base ai dati dei primi 30 secondi delle sorgenti registrate.

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

Per visualizzare le visualizzazioni dei dati, passa visualize_data=True al metodo show(). Puoi applicare più filtri alle visualizzazioni. La seguente visualizzazione consente di filtrare per etichetta e asse:

Il metodo show che visualizza una PCollection come un insieme completo di elementi dell'interfaccia utente filtrabili.

Per garantire la riproducibilità durante la prototipazione delle pipeline di streaming, il metodo show() richiama il riutilizzo dei dati acquisiti per impostazione predefinita. Per modificare questo comportamento e fare in modo che il metodo show() recuperi sempre nuovi dati, imposta interactive_beam.options.enable_capture_replay = False. Inoltre, se aggiungi una seconda origine illimitata al tuo notebook, i dati dell'origine illimitata precedente vengono ignorati.

Un'altra visualizzazione utile nei notebook Apache Beam è un DataFrame Pandas. L'esempio seguente converte prima le parole in minuscolo e poi calcola la frequenza di ciascuna parola.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

Il metodo collect() fornisce l'output in un DataFrame Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Il metodo collect che rappresenta una PCollection in un DataFrame Pandas.

La modifica e la riesecuzione di una cella è una pratica comune nello sviluppo di notebook. Quando modifichi ed esegui di nuovo una cella in un notebook Apache Beam, la cella non annulla l'azione prevista del codice nella cella originale. Ad esempio, se una cella aggiunge un PTransform a una pipeline, la riesecuzione della cella aggiunge un altro PTransform alla pipeline. Se vuoi cancellare lo stato, riavvia il kernel e poi esegui di nuovo le celle.

Visualizza i dati tramite lo strumento di controllo Beam interattivo

Potresti trovare distratto eseguire l'introspezione dei dati di un PCollection chiamando costantemente show() e collect(), soprattutto quando l'output occupa molto spazio sullo schermo e rende difficile la navigazione nel blocco note. Ti consigliamo inoltre di confrontare più PCollections una accanto all'altra per verificare se una trasformazione funziona come previsto. Ad esempio, quando un PCollection undergoes a transform and produces the other. Per questi casi d'uso, lo strumento di controllo del flusso interattivo è una soluzione pratica.

L'ispettore Beam interattivo è fornito come estensione JupyterLab apache-beam-jupyterlab-sidepanel preinstallata nel blocco note Apache Beam. Con l'estensione, puoi ispezionare in modo interattivo lo stato delle pipeline e dei dati associati a ogni PCollection senza invocare esplicitamente show() o collect().

Puoi aprire lo strumento di controllo in tre modi:

  • Fai clic su Interactive Beam nella barra dei menu in alto di JupyterLab. Nel menu a discesa, locate Open Inspector e fai clic per aprire l'ispezionatore.

    Aprire lo strumento di controllo tramite il menu

  • Utilizza la pagina di avvio. Se non è aperta alcuna pagina di avvio, fai clic su File -> New Launcher per aprirla. Nella pagina del programma di avvio, individua Interactive Beam e fai clic su Open Inspector per aprire l'ispezionatore.

    Aprire lo strumento di controllo tramite Avvio app

  • Utilizza la tavolozza dei comandi. Nella barra dei menu di JupyterLab, fai clic su View > Activate Command Palette. Nella finestra di dialogo, cerca Interactive Beam per visualizzare tutte le opzioni dell'estensione. Fai clic su Open Inspector per aprire l'ispezione.

    Aprire l'ispettore tramite la tavolozza dei comandi

Quando lo strumento di controllo sta per aprirsi:

  • Se è aperto esattamente un notebook, l'ispettore si connette automaticamente.

  • Se non è aperto alcun notebook, viene visualizzata una finestra di dialogo che ti consente di selezionare un kernel.

  • Se sono aperti più notebook, viene visualizzata una finestra di dialogo che ti consente di selezionare la sessione del notebook.

    Seleziona il notebook a cui connetterti

Ti consigliamo di aprire almeno un notebook e di selezionarne un kernel prima di aprire l'ispettore. Se apri un ispezionatore con un kernel prima di aprire un notebook, in un secondo momento, quando apri un notebook per connetterti all'ispettore, devi selezionare Interactive Beam Inspector Session da Use Kernel from Preferred Session. Un ispettore e un notebook sono collegati quando condividono la stessa sessione, non sessioni diverse create dallo stesso kernel. Se selezioni lo stesso kernel da Start Preferred Kernel, viene creata una nuova sessione indipendente dalle sessioni esistenti dei notebook o degli ispettori aperti.

Puoi aprire più ispettori per un notebook aperto e disporli trascinandone le schede liberamente nell'area di lavoro.

Aprire due ispettori e disporli affiancati

La pagina dell'ispettore si aggiorna automaticamente quando esegui le celle nel notebook. La pagina elenca le pipeline e i PCollections definiti nel blocco note collegato. PCollections sono organizzati in base alle pipeline a cui appartengono e puoi comprimirli facendo clic sulla pipeline dell'intestazione.

Per gli elementi delle pipeline e dell'elenco PCollections, al clic l'ispezionatore visualizza le visualizzazioni corrispondenti sul lato destro:

  • Se si tratta di un PCollection, l'ispettore esegue il rendering dei relativi dati (in modo dinamico se i dati continuano a essere inviati per PCollections illimitati) con widget aggiuntivi per ottimizzare la visualizzazione dopo aver fatto clic sul pulsante APPLY.

    Pagina di ispezione

    Poiché l'ispettore e il notebook aperto condividono la stessa sessione del kernel, si bloccano a vicenda. Ad esempio, se il notebook è impegnato a eseguire codice, l'ispettore non si aggiorna finché il notebook non completa l'esecuzione. Al contrario, se vuoi eseguire immediatamente il codice nel tuo notebook mentre l'ispettore visualizza dinamicamente un PCollection, devi fare clic sul pulsante STOP per interrompere la visualizzazione e rilasciare preventivamente il kernel nel notebook.

  • Se si tratta di una pipeline, l'ispettore mostra il grafico della pipeline.

    Pagina di ispezione

Potresti notare pipeline anonime. Queste pipeline hanno PCollections a cui puoi accedere, ma non vengono più richiamate dalla sessione principale. Ad esempio:

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

L'esempio precedente crea una pipeline vuota p e una pipeline anonima che contiene un PCollection pcoll. Puoi accedere alla pipeline anonima utilizzando pcoll.pipeline.

Puoi attivare/disattivare la pipeline e l'elenco PCollection per risparmiare spazio per le visualizzazioni di grandi dimensioni. Attiva/disattiva l'elenco a sinistra

Informazioni sullo stato della registrazione di una pipeline

Oltre alle visualizzazioni, puoi anche controllare lo stato della registrazione di una o tutte le pipeline nell'istanza del notebook chiamando describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

Il metodo describe() fornisce i seguenti dettagli:

  • Dimensioni totali (in byte) di tutte le registrazioni per la pipeline su disco
  • Ora di inizio del job di registrazione in background (in secondi dall'epoca Unix)
  • Stato attuale della pipeline del job di registrazione in background
  • Variabile Python per la pipeline

Avviare job Dataflow da una pipeline creata nel notebook

  1. (Facoltativo) Prima di utilizzare il notebook per eseguire i job Dataflow, riavvia il kernel, esegui di nuovo tutte le celle e verifica l'output. Se salti questo passaggio, gli stati nascosti nel notebook potrebbero influire sul grafico dei job nell'oggetto pipeline.
  2. Attiva l'API Dataflow.
  3. Aggiungi la seguente istruzione di importazione:

    from apache_beam.runners import DataflowRunner
    
  4. Passa le opzioni della pipeline.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    Puoi modificare i valori parametro. Ad esempio, puoi modificare il valore region da us-central1.

  5. Esegui la pipeline con DataflowRunner. Questo passaggio esegue il job sul servizio Dataflow.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p è un oggetto pipeline di Creazione della pipeline.

Per un esempio su come eseguire questa conversione in un notebook interattivo, consulta il notebook Conteggio parole di Dataflow nell'istanza del notebook.

In alternativa, puoi esportare il notebook come script eseguibile, modificare il file .py generato utilizzando i passaggi precedenti e poi eseguire il deployment della pipeline nel servizio Dataflow.

Salvare il blocco note

Notebooks che crei vengono salvati localmente nell'istanza del blocco note in esecuzione. Se reinizializzi o sposti l'istanza del notebook durante lo sviluppo, i nuovi notebook vengono mantenuti finché vengono creati nella directory /home/jupyter. Tuttavia, se un'istanza di blocco note viene eliminata, vengono eliminati anche i blocchi note.

Per conservare i tuoi notebook per un uso futuro, scaricali localmente sulla tua workstation, salvali su GitHub o esportali in un formato file diverso.

Salvare il notebook su dischi permanenti aggiuntivi

Se vuoi conservare i tuoi lavori, ad esempio notebook e script, in varie istanze di notebook, archiviali in un Persistent Disk.

  1. Crea o collega un disco permanente. Segui le istruzioni per utilizzare ssh per connetterti alla VM dell'istanza del notebook ed emettere comandi in Cloud Shell.

  2. Prendi nota della directory in cui è montato il Persistent Disk, ad esempio/mnt/myDisk.

  3. Modifica i dettagli della VM dell'istanza del notebook per aggiungere una voce a Custom metadata: chiave - container-custom-params; valore - -v /mnt/myDisk:/mnt/myDisk. Metadati aggiuntivi necessari per associare il DP montato

  4. Fai clic su Salva.

  5. Per aggiornare queste modifiche, reimposta l'istanza del notebook. Ripristinare un&#39;istanza di notebook

  6. Dopo il ripristino dei dati di fabbrica, fai clic su Apri JupyterLab. Potrebbe essere necessario del tempo prima che l'interfaccia utente di JupyterLab diventi disponibile. Dopo che viene visualizzata l'interfaccia utente, apri un terminale ed esegui il seguente comando: ls -al /mnt La directory /mnt/myDisk dovrebbe essere elencata. Elenco volumi associati

Ora puoi salvare il tuo lavoro nella directory /mnt/myDisk. Anche se l'istanza del notebook viene eliminata, il Persistent Disk esiste nel progetto. Puoi quindi collegare questo Persistent Disk ad altre istanze del notebook.

Esegui la pulizia

Al termine dell'utilizzo dell'istanza del notebook Apache Beam, ripulisci le risorse che hai creato su Google Cloud spegnendo l'istanza del notebook.

Passaggi successivi