Gravar do Dataflow para o Cloud Storage

Neste documento, descrevemos como gravar dados de texto do Dataflow para o Cloud Storage usando o conector de E/S do Apache Beam TextIO.

Incluir a dependência da biblioteca Google Cloud

Para usar o conector TextIO com o Cloud Storage, inclua a dependência a seguir. Essa biblioteca fornece um gerenciador de esquema para os nomes de arquivo "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Para mais informações, consulte Instalar o SDK do Apache Beam.

Ativar o gRPC no conector de E/S do Apache Beam no Dataflow

É possível se conectar ao Cloud Storage usando o gRPC pelo conector de E/S do Apache Beam no Dataflow. O gRPC é um framework de chamada de procedimento remoto (RPC, na sigla em inglês) de código aberto de alta performance desenvolvido pelo Google que pode ser usado para interagir com o Cloud Storage.

Para acelerar as solicitações de gravação do job do Dataflow no Cloud Storage, é possível ativar o conector de E/S do Apache Beam no Dataflow para usar o gRPC.

Linha de comando

  1. Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
  2. Para executar um job do Dataflow, use a opção de pipeline --additional-experiments=use_grpc_for_gcs. Para informações sobre as diferentes opções de pipeline, consulte Flags opcionais.

SDK do Apache Beam

  1. Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
  2. Para executar um job do Dataflow, use a opção de pipeline --experiments=use_grpc_for_gcs. Para informações sobre as diferentes opções de pipeline, consulte Opções básicas.

É possível configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC no Cloud Monitoring. As métricas relacionadas ao gRPC podem ajudar você a fazer o seguinte:

  • Monitore e otimize a performance das solicitações gRPC para o Cloud Storage.
  • Resolver e depurar problemas.
  • Receba insights sobre o uso e o comportamento do seu aplicativo.

Para informações sobre como configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC, consulte Usar métricas do lado do cliente. Se não for necessário coletar métricas para seu caso de uso, você pode desativar essa opção. Para instruções, consulte Desativar as métricas do lado do cliente.

Paralelismo

O paralelismo é determinado principalmente pelo número de fragmentos. Por padrão, o executor define esse valor automaticamente. Para a maioria dos pipelines, é recomendável usar o comportamento padrão. Confira as Práticas recomendadas neste documento.

Desempenho

A tabela a seguir mostra as métricas de desempenho para gravar no Cloud Storage. As cargas de trabalho foram executadas em um worker e2-standard2 usando o SDK do Apache Beam 2.49.0 para Java. Eles não usaram o Runner v2.

100 milhões de registros | 1 KB | 1 coluna Capacidade de processamento (bytes) Capacidade de processamento (elementos)
Gravar 130 MBps 130.000 elementos por segundo

Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.

Práticas recomendadas

  • Em geral, evite definir um número específico de fragmentos. Isso permite que o executor selecione um valor adequado para a escala. Para ativar o particionamento automático, chame .withAutoSharding(), não .⇉withNumShards⇇(0). Se você ajustar o número de fragmentos, recomendamos uma gravação entre 100 MB e 1 GB por fragmento. No entanto, o valor ideal depende da carga de trabalho.

  • O Cloud Storage pode escalonar para um número muito grande de solicitações por segundo. No entanto, se o pipeline tiver picos grandes no volume de gravação, grave em vários buckets para evitar a sobrecarga temporária de algum bucket do Cloud Storage.

  • Em geral, a gravação no Cloud Storage é mais eficiente quando cada gravação é maior (1 KB ou mais). Gravar pequenos registros em um grande número de arquivos pode resultar em um desempenho ruim por byte.

  • Ao gerar nomes de arquivos, use nomes não sequenciais para distribuir a carga. Para mais informações, confira Usar uma convenção de nomenclatura que distribua a carga uniformemente pelos intervalos de chaves.

  • Ao nomear arquivos, não use arroba ('@') seguido de um número em um asterisco ('*'). Para mais informações, confira "@*" e "@N" são especificações de fragmentação reservadas.

Exemplo: Gravar arquivos de texto para o Cloud Storage

O exemplo a seguir cria um pipeline em lote que grava arquivos de texto usando a compactação GZIP:

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Se a entrada PCollection for ilimitada, você precisará definir uma janela ou um gatilho na coleção e especificar gravações em janelas chamando TextIO.Write.withWindowedWrites.

Python

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Para o caminho de saída, especifique um caminho do Cloud Storage que inclua o nome do bucket e um prefixo de nome de arquivo. Por exemplo, se você especificar gs://my_bucket/output/file, o conector TextIO gravará no bucket do Cloud Storage chamado my_bucket e os arquivos de saída terão o prefixo output/file*.

Por padrão, o conector TextIO fragmenta os arquivos de saída usando uma convenção de nomenclatura como esta: <file-prefix>-00000-of-00001. Também é possível especificar um sufixo de nome de arquivo e um esquema de compactação, conforme mostrado no exemplo.

Para garantir gravações idempotentes, o Dataflow grava em um arquivo temporário e copia o arquivo temporário concluído para o arquivo final. Para controlar o local de armazenamento desses arquivos temporários, use o método withTempDirectory.

A seguir