Lire des données depuis Cloud Storage vers Dataflow

Pour lire des données de Cloud Storage vers Dataflow, utilisez le connecteur d'E/S Apache Beam TextIO ou AvroIO.

Inclure la dépendance de la bibliothèque Google Cloud

Pour utiliser le connecteur TextIO ou AvroIO avec Cloud Storage, incluez la dépendance suivante. Cette bibliothèque fournit un gestionnaire de schéma pour les noms de fichiers de type "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Pour en savoir plus, consultez la section Installer le SDK Apache Beam.

Activer gRPC sur le connecteur d'E/S Apache Beam dans Dataflow

Vous pouvez vous connecter à Cloud Storage à l'aide de gRPC via le connecteur d'E/S Apache Beam sur Dataflow. gRPC est un framework d'appel de procédure à distance (RPC) Open Source hautes performances développé par Google que vous pouvez utiliser pour interagir avec Cloud Storage.

Pour accélérer les requêtes de lecture de votre tâche Dataflow vers Cloud Storage, vous pouvez activer le connecteur d'E/S Apache Beam sur Dataflow pour utiliser gRPC.

Ligne de commande

  1. Assurez-vous d'utiliser le SDK Apache Beam version 2.55.0 ou ultérieure.
  2. Pour exécuter un job Dataflow, utilisez l'option de pipeline --additional-experiments=use_grpc_for_gcs. Pour en savoir plus sur les différentes options de pipeline, consultez la section Options facultatives.

SDK Apache Beam

  1. Assurez-vous d'utiliser le SDK Apache Beam version 2.55.0 ou ultérieure.
  2. Pour exécuter un job Dataflow, utilisez l'option de pipeline --experiments=use_grpc_for_gcs. Pour en savoir plus sur les différentes options de pipeline, consultez la section Options de base.

Vous pouvez configurer le connecteur d'E/S Apache Beam sur Dataflow pour générer des métriques liées à gRPC dans Cloud Monitoring. Les métriques liées à gRPC peuvent vous aider à effectuer les opérations suivantes:

  • Surveillez et optimisez les performances des requêtes gRPC envoyées à Cloud Storage.
  • Dépanner et déboguer les problèmes
  • Obtenez des insights sur l'utilisation et le comportement de votre application.

Pour savoir comment configurer le connecteur d'E/S Apache Beam sur Dataflow afin de générer des métriques liées à gRPC, consultez Utiliser des métriques côté client. Si la collecte de métriques n'est pas nécessaire pour votre cas d'utilisation, vous pouvez choisir de désactiver la collecte de métriques. Pour obtenir des instructions, consultez Désactiver les métriques côté client.

Parallélisme

Les connecteurs TextIO et AvroIO acceptent deux niveaux de parallélisme :

  • Les fichiers individuels sont indexés séparément, de sorte que plusieurs nœuds de calcul différents puissent les lire.
  • Si les fichiers ne sont pas compressés, le connecteur peut lire des sous-plages de chaque fichier séparément, ce qui entraîne un niveau de parallélisme très élevé. Cette division n'est possible que si chaque ligne du fichier est un enregistrement pertinent. Elle n'est par exemple pas disponible par défaut pour les fichiers JSON.

Performances

Le tableau suivant présente les métriques de performances pour les opérations de lecture à partir de Cloud Storage. Les charges de travail ont été exécutées sur un nœud de calcul e2-standard2 à l'aide du SDK Apache Beam 2.49.0 pour Java. Elles n'ont pas utilisé l'exécuteur v2.

100 millions d'enregistrements | 1 Ko | 1 colonne Débit (octets) Débit (éléments)
Lecture 320 Mbit/s 320 000 éléments par seconde

Ces métriques sont basées sur des pipelines de traitement par lot simples. Elles ont pour but de comparer les performances entre les connecteurs d'E/S et ne sont pas nécessairement représentatives des pipelines réels. Les performances des pipelines Dataflow sont complexes et dépendent du type de machine virtuelle, des données traitées, des performances des sources et des récepteurs externes, ainsi que du code utilisateur. Les métriques sont basées sur l'exécution du SDK Java et ne sont pas représentatives des caractéristiques de performances des SDK d'autres langages. Pour en savoir plus, consultez la page Performances d'E/S Beam.

Bonnes pratiques

  • Évitez d'utiliser watchForNewFiles avec Cloud Storage. Cette approche n'est pas adaptée pour les pipelines de production volumineux en termes de scaling, car le connecteur doit conserver une liste de fichiers observés en mémoire. Cette liste ne peut pas être vidée de la mémoire, ce qui réduit la mémoire de travail des nœuds de calcul au fil du temps. Envisagez plutôt d'utiliser les notifications Pub/Sub pour Cloud Storage. Pour en savoir plus, consultez la page Modèles de traitement des fichiers.

  • Si le nom de fichier et le contenu du fichier sont tous deux des données utiles, utilisez la classe FileIO pour lire les noms de fichiers. Par exemple, un nom de fichier peut contenir des métadonnées utiles lors du traitement des données du fichier. Pour en savoir plus, consultez la section Accéder aux noms de fichiers. La documentation sur FileIO présente également un exemple de ce modèle.

Exemple

L'exemple suivant montre comment lire à partir de Cloud Storage.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

Étape suivante