Questo documento descrive come scrivere dati di testo da Dataflow a Cloud Storage utilizzando il TextIO
connettore I/O Apache Beam.
Includi la dipendenza della libreria Google Cloud
Per utilizzare il connettore TextIO
con Cloud Storage, includi la seguente dipendenza. Questa libreria fornisce un gestore dello schema per i nomi dei file "gs://"
.
Java
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Python
apache-beam[gcp]==VERSION
Vai
import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
Per saperne di più, consulta Installa l'SDK Apache Beam.
Abilitare gRPC sul connettore I/O Apache Beam su Dataflow
Puoi connetterti a Cloud Storage utilizzando gRPC tramite il connettore I/O Apache Beam su Dataflow. gRPC è un framework open source per chiamata di procedura remota (RPC) ad alte prestazioni sviluppato da Google che puoi utilizzare per interagire con Cloud Storage.
Per velocizzare le richieste di scrittura del job Dataflow in Cloud Storage, puoi attivare il connettore I/O Apache Beam su Dataflow per utilizzare gRPC.
Riga di comando
- Assicurati di utilizzare la versione 2.55.0 o successive dell'SDK Apache Beam.
- Per eseguire un job Dataflow, utilizza l'opzione della pipeline
--additional-experiments=use_grpc_for_gcs
. Per informazioni sulle diverse opzioni della pipeline, consulta Flag facoltativi.
SDK Apache Beam
- Assicurati di utilizzare la versione 2.55.0 o successive dell'SDK Apache Beam.
-
Per eseguire un job Dataflow, utilizza l'opzione
--experiments=use_grpc_for_gcs
pipeline. Per informazioni sulle diverse opzioni della pipeline, consulta Opzioni di base.
Puoi configurare il connettore I/O Apache Beam su Dataflow per generare metriche correlate a gRPC in Cloud Monitoring. Le metriche correlate a gRPC possono aiutarti a:
- Monitora e ottimizza le prestazioni delle richieste gRPC a Cloud Storage.
- Risolvi i problemi ed esegui il debug.
- Ottieni informazioni sull'utilizzo e sul comportamento della tua applicazione.
Per informazioni su come configurare il connettore Apache Beam I/O su Dataflow per generare metriche correlate a gRPC, consulta Utilizzare le metriche lato client. Se la raccolta di metriche non è necessaria per il tuo caso d'uso, puoi disattivarla. Per istruzioni, vedi Disattivare le metriche lato client.
Parallelismo
Il parallelismo è determinato principalmente dal numero di shard. Per impostazione predefinita, il runner imposta automaticamente questo valore. Per la maggior parte delle pipeline, è consigliabile utilizzare il comportamento predefinito. In questo documento, consulta la sezione Best practice.
Prestazioni
La tabella seguente mostra le metriche di rendimento per la scrittura in Cloud Storage. I carichi di lavoro sono stati eseguiti su un worker e2-standard2
,
utilizzando l'SDK Apache Beam 2.49.0 per Java. Non hanno utilizzato Runner v2.
100 milioni di record | 1 kB | 1 colonna | Velocità effettiva (byte) | Throughput (elementi) |
---|---|---|
Scrittura | 130 MBps | 130.000 elementi al secondo |
Queste metriche si basano su semplici pipeline batch. Sono pensati per confrontare le prestazioni tra i connettori I/O e non sono necessariamente rappresentativi delle pipeline reali. Il rendimento della pipeline Dataflow è complesso e dipende dal tipo di VM, dai dati in fase di elaborazione, dal rendimento delle origini e dei sink esterni e dal codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentative delle caratteristiche di prestazioni di altri SDK di linguaggio. Per saperne di più, consulta Prestazioni di Beam IO.
Best practice
In generale, evita di impostare un numero specifico di shard. In questo modo, il runner può selezionare un valore appropriato per la tua scala. Per attivare l'autosharding, chiama
.withAutoSharding()
, non.⇉withNumShards⇇(0)
. Se regoli il numero di shard, ti consigliamo di scrivere tra 100 MB e 1 GB per shard. Tuttavia, il valore ottimale potrebbe dipendere dal carico di lavoro.Cloud Storage può scalare fino a un numero molto elevato di richieste al secondo. Tuttavia, se la pipeline presenta picchi elevati nel volume di scrittura, valuta la possibilità di scrivere in più bucket per evitare di sovraccaricare temporaneamente un singolo bucket Cloud Storage.
In generale, la scrittura in Cloud Storage è più efficiente quando ogni scrittura è più grande (1 kB o più). La scrittura di piccoli record in un numero elevato di file può comportare prestazioni peggiori per byte.
Quando generi i nomi dei file, valuta la possibilità di utilizzare nomi non sequenziali, in modo da distribuire il carico. Per saperne di più, consulta la sezione Utilizzare una convenzione di denominazione che distribuisca il carico in modo uniforme tra gli intervalli di chiavi.
Quando assegni un nome ai file, non utilizzare il simbolo @ seguito da un numero o da un asterisco (*). Per saperne di più, consulta la sezione "@*" e "@N" sono specifiche di partizionamento riservate.
Esempio: scrivere file di testo in Cloud Storage
L'esempio seguente crea una pipeline batch che scrive file di testo utilizzando la compressione GZIP:
Java
Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.
Se l'input PCollection
non è vincolato, devi definire una finestra o un
trigger sulla raccolta, quindi specificare le scritture in finestre chiamando
TextIO.Write.withWindowedWrites
.
Python
Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.
Per il percorso di output, specifica un percorso Cloud Storage che includa il nome del bucket e un prefisso del nome file. Ad esempio, se specifichi
gs://my_bucket/output/file
, il connettore TextIO
scrive nel
bucket Cloud Storage denominato my_bucket
e i file di output hanno il prefisso
output/file*
.
Per impostazione predefinita, il connettore TextIO
suddivide i file di output in shard, utilizzando una convenzione di denominazione come questa: <file-prefix>-00000-of-00001
. Se vuoi, puoi
specificare un suffisso del nome file e uno schema di compressione, come mostrato nell'esempio.
Per garantire scritture idempotenti, Dataflow scrive in un file temporaneo
e poi copia il file temporaneo completato nel file finale.
Per controllare dove vengono archiviati questi file temporanei, utilizza il metodo withTempDirectory
.
Passaggi successivi
- Leggi la documentazione dell'API
TextIO
. - Consulta l'elenco dei modelli forniti da Google.