Arricchire i dati in streaming

Apache Beam semplifica il flusso di lavoro di arricchimento dei dati fornendo una trasformazione di arricchimento chiavi in mano che puoi aggiungere alla tua pipeline. Questa pagina spiega come utilizzare la trasformazione di arricchimento Apache Beam per arricchire i dati in streaming.

Quando arricchisci i dati, aumenti i dati non elaborati di un'origine aggiungendo quelli correlati di una seconda origine. I dati aggiuntivi possono provenire da varie origini, ad esempio Bigtable o BigQuery. La trasformazione di arricchimento di Apache Beam utilizza una ricerca chiave-valore per collegare i dati aggiuntivi ai dati non elaborati.

Di seguito sono riportati alcuni casi in cui l'arricchimento dei dati è utile:

  • Vuoi creare una pipeline di e-commerce che acquisisca le attività utente da un sito web o un'app e fornisca consigli personalizzati. La trasformazione incorpora le attività nei dati della pipeline in modo da poter fornire i consigli personalizzati.
  • Hai dati utente che vuoi unire a dati geografici per eseguire analisi basate sulla geografia.
  • Vuoi creare una pipeline che raccolga i dati dei dispositivi internet of things (IoT) che inviano eventi di telemetria.

Vantaggi

La trasformazione di arricchimento presenta i seguenti vantaggi:

  • Trasforma i dati senza richiedere la scrittura di codice complesso o la gestione delle librerie sottostanti.
  • Fornisce gestori delle origini integrati.
  • Utilizza la limitazione lato client per gestire il limitazione di frequenza delle richieste. Le richieste vengono sottoposte a backoff esponenziale con una strategia di ripetizione predefinita. Puoi configurare limitazione di frequenza in base al tuo caso d'uso.

Supporto e limitazioni

La trasformazione di arricchimento ha i seguenti requisiti:

  • Disponibile per le pipeline batch e in streaming.
  • Il gestore BigTableEnrichmentHandler è disponibile nell'SDK Apache Beam per Python 2.54.0 e versioni successive.
  • Il gestore BigQueryEnrichmentHandler è disponibile nell'SDK Apache Beam per Python nelle versioni 2.57.0 e successive.
  • Il gestore VertexAIFeatureStoreEnrichmentHandler è disponibile nell'SDK Apache Beam per Python 2.55.0 e versioni successive.
  • Quando utilizzi le versioni 2.55.0 e successive dell'SDK Apache Beam per Python, devi anche installare il client Python per Redis.
  • I job Dataflow devono utilizzare Runner v2.

Utilizzare la trasformazione di arricchimento

Per utilizzare la trasformazione di arricchimento, includi il seguente codice nella pipeline:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Poiché la trasformazione di arricchimento esegue un join croce per impostazione predefinita, progetta il join personalizzato per arricchire i dati di input. Questo design garantisce che l'unione includa solo i campi specificati.

Nell'esempio seguente, left è l'elemento di input della trasformazione di arricchimento e right sono i dati recuperati da un servizio esterno per quell'elemento di input.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parametri

Per utilizzare la trasformazione di arricchimento, è necessario il parametro EnrichmentHandler.

Puoi anche utilizzare un parametro di configurazione per specificare una funzione lambda per una funzione di join, un timeout, un regolatore o un ripetitore (strategia di ripetizione). Sono disponibili i seguenti parametri di configurazione:

  • join_fn: una funzione lambda che accetta dizionari come input e restituisce una riga arricchita (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). La riga arricchita specifica come unire i dati recuperati dall'API. Il valore predefinito è un join croce.
  • timeout: il numero di secondi di attesa per il completamento della richiesta da parte dell'API prima del timeout. Il valore predefinito è 30 secondi.
  • throttler: specifica il meccanismo di limitazione. L'unica opzione supportata è la gestione della larghezza di banda adattiva lato client predefinita.
  • repeater: specifica la strategia di nuovo tentativo quando si verificano errori come TooManyRequests e TimeoutException. Il valore predefinito è ExponentialBackOffRepeater.

Passaggi successivi