Ler dados do Cloud Storage para Dataflow

Para ler dados do Cloud Storage para Dataflow, use o conector de E/S TextIO ou AvroIO do Apache Beam.

Incluir a dependência da biblioteca Google Cloud

Para usar o conector TextIO ou AvroIOcom o Cloud Storage, inclua a dependência a seguir. Essa biblioteca fornece um gerenciador de esquema para os nomes de arquivo "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Para mais informações, consulte Instalar o SDK do Apache Beam.

Ativar o gRPC no conector de E/S do Apache Beam no Dataflow

É possível se conectar ao Cloud Storage usando o gRPC pelo conector de E/S do Apache Beam no Dataflow. O gRPC é um framework de chamada de procedimento remoto (RPC, na sigla em inglês) de código aberto de alta performance desenvolvido pelo Google que pode ser usado para interagir com o Cloud Storage.

Para acelerar as solicitações de leitura do job do Dataflow para o Cloud Storage, você pode ativar o conector de E/S do Apache Beam no Dataflow para usar o gRPC.

Linha de comando

  1. Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
  2. Para executar um job do Dataflow, use a opção de pipeline --additional-experiments=use_grpc_for_gcs. Para informações sobre as diferentes opções de pipeline, consulte Flags opcionais.

SDK do Apache Beam

  1. Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
  2. Para executar um job do Dataflow, use a opção de pipeline --experiments=use_grpc_for_gcs. Para informações sobre as diferentes opções de pipeline, consulte Opções básicas.

É possível configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC no Cloud Monitoring. As métricas relacionadas ao gRPC podem ajudar você a fazer o seguinte:

  • Monitore e otimize a performance das solicitações gRPC para o Cloud Storage.
  • Resolver e depurar problemas.
  • Receba insights sobre o uso e o comportamento do seu aplicativo.

Para informações sobre como configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC, consulte Usar métricas do lado do cliente. Se não for necessário coletar métricas para seu caso de uso, você pode desativar essa opção. Para instruções, consulte Desativar as métricas do lado do cliente.

Paralelismo

Os conectores TextIO e AvroIO são compatíveis com dois níveis de paralelismo:

  • Os arquivos individuais são codificados separadamente para que vários workers possam lê-los.
  • Se os arquivos não estiverem compactados, o conector consegue ler os subintervalos de cada arquivo separadamente, levando a um nível muito alto de paralelismo. Essa divisão só é possível se cada linha do arquivo for um registro significativo. Por exemplo, por padrão, ele está indisponível para arquivos JSON.

Desempenho

A tabela a seguir mostra as métricas de desempenho de leitura do Cloud Storage. As cargas de trabalho foram executadas em um worker e2-standard2 usando o SDK do Apache Beam 2.49.0 para Java. Eles não usaram o Runner v2.

100 milhões de registros | 1 KB | 1 coluna Capacidade de processamento (bytes) Capacidade de processamento (elementos)
Ler 320 MBps 320.000 elementos por segundo

Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.

Práticas recomendadas

Exemplo

O exemplo a seguir mostra como ler do Cloud Storage.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

A seguir