Esegui lo streaming delle modifiche con Dataflow

Il connettore Beam Bigtable ti consente di utilizzare Dataflow per leggere i record delle modifiche dei dati di Bigtable senza dover monitorare o elaborare le modifiche della partizione nel codice, perché il connettore gestisce questa logica per te.

Questo documento descrive come configurare e utilizzare il connettore Beam Bigtable per leggere uno stream di modifiche utilizzando una pipeline Dataflow. Prima di leggere questo documento, devi leggere la Panoramica degli stream di modifiche e conoscere Dataflow.

Alternative alla creazione di una pipeline personalizzata

Se non vuoi creare la tua pipeline Dataflow, puoi utilizzare una delle seguenti opzioni.

Puoi utilizzare un modello Dataflow fornito da Google.

Puoi anche utilizzare gli esempi di codice del tutorial o della procedura di quickstart di Bigtable come punto di partenza per il tuo codice.

Assicurati che il codice generato utilizzigoogle cloud libraries-bom 26.14.0 o versioni successive.

Dettagli del connettore

Il metodo del connettore Bigtable Beam, BigtableIO.readChangeStream, ti consente di leggere uno stream di record di variazione dei dati (ChangeStreamMutation) che puoi elaborare. Il connettore Beam Bigtable è un componente del repository GitHub di Apache Beam. Per una descrizione del codice del connettore, consulta i commenti in BigtableIO.java.

Devi utilizzare il connettore con Beam versione 2.48.0 o successive. Controlla il supporto del runtime Apache Beam per assicurarti di utilizzare una versione di Java supportata. Dopodiché puoi implementare una pipeline che utilizza il connettore per Dataflow, che gestisce il provisioning e la gestione delle risorse e contribuisce alla scalabilità e all'affidabilità dell'elaborazione dei dati in streaming.

Per ulteriori informazioni sul modello di programmazione Apache Beam, consulta la documentazione di Beam.

Raggruppamento dei dati senza orari degli eventi

I record delle modifiche dei dati trasmessi in streaming utilizzando il connettore Beam Bigtable non sono compatibili con le funzioni Dataflow che dipendono dai tempi degli eventi.

Come spiegato in Replica e contrassegni di controllo, un contrassegno di controllo basso potrebbe non avanzare se la replica per la partizione non ha raggiunto il resto dell'istanza. Quando un'acqua bassa smette di avanzare, può provocare l'interruzione dello stream delle modifiche.

Per evitare che lo stream si blocchi, il connettore Beam Bigtable emette tutti i dati con un timestamp di output di zero. Il timestamp pari a zero fa sì che Dataflow consideri tutti i record di variazione dei dati come dati in ritardo. Di conseguenza, le funzionalità di Dataflow che dipendono dai tempi degli eventi non sono compatibili con modifiche in tempo reale di Bigtable. Nello specifico, non puoi utilizzare funzioni di finestra, attivatori in base al momento dell'evento o timer in base al momento dell'evento.

In alternativa, puoi utilizzare GlobalWindows con attivatori di ora non basati su eventi per raggruppare questi dati in ritardo in riquadri, come dimostrato nell'esempio del tutorial. Per informazioni dettagliate su trigger e riquadri, consulta la sezione Trigger della guida alla programmazione di Beam.

Scalabilità automatica

Il connettore supporta il scaling automatico di Dataflow, che è abilitato per impostazione predefinita quando si utilizza Runner v2 (obbligatorio). L'algoritmo di scalabilità automatica di Dataflow prende in considerazione il backlog stimato dello stream di modifiche, che può essere monitorato nella pagina Monitoraggio di Dataflow nella sezione Backlog. Utilizza il flag --maxNumWorkers durante il deployment di un job per limitare il numero di worker.

Per eseguire la scalabilità manuale della pipeline anziché utilizzare la scalabilità automatica, consulta Eseguire la scalabilità manuale di una pipeline di streaming.

Limitazioni

Tieni presente le seguenti limitazioni prima di utilizzare il connettore Beam Bigtable con Dataflow.

Dataflow Runner v2

Il connettore può essere eseguito solo utilizzando Dataflow Runner v2. Per attivare questa opzione, specifica --experiments=use_runner_v2 negli argomenti della riga di comando. L'esecuzione con Runner 1.0 causa il fallimento della pipeline con la seguente eccezione:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshot

Il connettore non supporta gli istantanei di Dataflow.

Duplicati

Il connettore Beam Bigtable trasmette le modifiche per ogni chiave di riga e ogni cluster in ordine di timestamp del commit, ma, poiché a volte si riavvia da epoche precedenti nello stream, può produrre duplicati.

Prima di iniziare

Prima di utilizzare il connettore, completa i seguenti prerequisiti.

Configura l'autenticazione

Per utilizzare gli Java esempi in questa pagina in un ambiente di sviluppo locale, installa e inizializza l'interfaccia alla gcloud CLI, quindi configura le Credenziali predefinite dell'applicazione con le tue credenziali utente.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Per ulteriori informazioni, consulta Set up authentication for a local development environment.

Per informazioni sulla configurazione dell'autenticazione per un ambiente di produzione, consulta Set up Application Default Credentials for code running on Google Cloud.

Attivare un flusso di modifiche

Per poter leggere una tabella, devi attivare un flusso di modifiche su una tabella. Puoi anche creare una nuova tabella con modifiche in tempo reale abilitati.

Modificare la tabella dei metadati dello stream

Quando esegui lo streaming delle modifiche con Dataflow, il connettore Bigtable Beam crea una tabella dei metadati denominata __change_stream_md_table per impostazione predefinita. La tabella dei metadati dello stream di modifiche gestisce lo stato operativo del connettore e memorizza i metadati relativi ai record di variazione dei dati.

Per impostazione predefinita, il connettore crea la tabella nella stessa istanza della tabella in streaming. Per assicurarti che la tabella funzioni correttamente, il profilo dell'app per la tabella dei metadati deve utilizzare l'instradamento a cluster singolo e avere attivato le transazioni con una riga.

Per ulteriori informazioni su come eseguire lo streaming delle modifiche da Bigtable con il connettore Bigtable Beam, consulta la documentazione di BigtableIO.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per leggere uno stream di modifiche Bigtable utilizzando Dataflow, chiedi all'amministratore di concederti i seguenti ruoli IAM.

Per leggere le modifiche da Bigtable, devi disporre di questo ruolo:

  • Amministratore Bigtable (roles/bigtable.admin) nell'istanza Bigtable contenente la tabella da cui vuoi eseguire lo streaming delle modifiche

Per eseguire il job Dataflow, devi disporre dei seguenti ruoli:

Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestire l'accesso.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite ruoli personalizzati o altri ruoli predefiniti.

Aggiungi il connettore Bigtable Beam come dipendenza

Aggiungi codice simile alla seguente dipendenza al file pom.xml di Maven. La versione deve essere 2.48.0 o successiva.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Leggi il flusso di modifiche

Per creare una pipeline Dataflow per leggere i record di modifica dei dati, configura il connettore e poi aggiungi trasformazioni e sink. Poi, utilizza il connettore per leggere gli oggetti ChangeStreamMutation in una pipeline Beam.

Gli esempi di codice in questa sezione, scritti in Java, mostrano come creare una pipeline e utilizzarla per convertire le coppie chiave-valore in una stringa. Ogni coppia è composta da una chiave di riga e da un oggetto ChangeStreamMutation. La pipeline converte le voci di ogni oggetto in una stringa separata da virgole.

Crea la pipeline

Questo esempio di codice Java mostra come creare la pipeline:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Elabora i record di modifica dei dati

Questo esempio mostra come eseguire un ciclo per tutte le voci di un record di modifica dei dati per una riga e chiamare un metodo di conversione in stringa in base al tipo di voce.

Per un elenco dei tipi di voci che un record di variazione dei dati può contenere, consulta Che cosa contiene un record di variazione dei dati.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

In questo esempio viene convertita una voce write:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

In questo esempio viene convertita una voce di eliminazione di celle:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

In questo esempio viene convertita una voce di eliminazione di una famiglia di colonne:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Monitoraggio

Le seguenti risorse nella console Google Cloud ti consentono di monitorare le tue risorse Google Cloud durante l'esecuzione di una pipeline Dataflow per leggere uno stream di modifiche Bigtable:

In particolare, controlla le seguenti metriche:

  • Nella pagina Monitoraggio di Bigtable, controlla queste metriche:
    • Dati sull'utilizzo della CPU per stream di modifiche nella metrica cpu_load_by_app_profile_by_method_by_table. Mostra l'impatto del flusso di modifiche sull'utilizzo della CPU del cluster.
    • Utilizzo dello spazio di archiviazione per le modifiche in tempo reale (byte) (change_stream_log_used_bytes).
  • Nella pagina di monitoraggio di Dataflow, seleziona Aggiornamento dei dati, che mostra la differenza tra l'ora corrente e il watermark. Dovrebbe durare circa due minuti, con picchi occasionali di un paio di minuti in più. Se la metrica di aggiornamento dei dati è costantemente superiore a questa soglia, la tua pipeline probabilmente non dispone di risorse sufficienti e dovresti aggiungere altri worker Dataflow. L'aggiornamento dei dati non indica se i record di modifica dei dati vengono elaborati lentamente.
  • La metrica processing_delay_from_commit_timestamp_MEAN Dataflow può indicare il tempo di elaborazione medio dei record di modifica dei dati durante la lifetime del job.

La metrica server/latencies di Bigtable non è utile quando monitori una pipeline Dataflow che legge un flusso di modifiche Bigtable, perché riflette la durata della richiesta di streaming, non la latenza di elaborazione dei record di modifica dei dati. Una latenza elevata in un stream di modifiche non significa che le richieste vengono elaborate lentamente, ma che la connessione è stata aperta per tutto questo tempo.

Passaggi successivi