Apache Beam semplifica il flusso di lavoro di arricchimento dei dati fornendo una trasformazione di arricchimento chiavi in mano che puoi aggiungere alla tua pipeline. Questa pagina spiega come utilizzare la trasformazione di arricchimento Apache Beam per arricchire i dati in streaming.
Quando arricchisci i dati, aumenti i dati non elaborati di un'origine aggiungendo quelli correlati di una seconda origine. I dati aggiuntivi possono provenire da varie origini, ad esempio Bigtable o BigQuery. La trasformazione di arricchimento di Apache Beam utilizza una ricerca chiave-valore per collegare i dati aggiuntivi ai dati non elaborati.
Di seguito sono riportati alcuni casi in cui l'arricchimento dei dati è utile:
- Vuoi creare una pipeline di e-commerce che acquisisca le attività utente da un sito web o un'app e fornisca consigli personalizzati. La trasformazione incorpora le attività nei dati della pipeline in modo da poter fornire i consigli personalizzati.
- Hai dati utente che vuoi unire a dati geografici per eseguire analisi basate sulla geografia.
- Vuoi creare una pipeline che raccolga i dati dei dispositivi internet of things (IoT) che inviano eventi di telemetria.
Vantaggi
La trasformazione di arricchimento presenta i seguenti vantaggi:
- Trasforma i dati senza richiedere la scrittura di codice complesso o la gestione delle librerie sottostanti.
- Fornisce gestori delle origini integrati.
- Utilizza l'handler
BigTableEnrichmentHandler
per arricchire i dati utilizzando un'origine Bigtable senza passare i dettagli di configurazione. - Utilizza l'handler
BigQueryEnrichmentHandler
per arricchire i dati utilizzando un'origine BigQuery senza passare i dettagli di configurazione. - Utilizza il gestore
VertexAIFeatureStoreEnrichmentHandler
con Vertex AI Feature Store e la pubblicazione online di Bigtable.
- Utilizza l'handler
- Utilizza la limitazione lato client per gestire il limitazione di frequenza delle richieste. Le richieste vengono sottoposte a backoff esponenziale con una strategia di ripetizione predefinita. Puoi configurare limitazione di frequenza in base al tuo caso d'uso.
Supporto e limitazioni
La trasformazione di arricchimento ha i seguenti requisiti:
- Disponibile per le pipeline batch e in streaming.
- Il gestore
BigTableEnrichmentHandler
è disponibile nell'SDK Apache Beam per Python 2.54.0 e versioni successive. - Il gestore
BigQueryEnrichmentHandler
è disponibile nell'SDK Apache Beam per Python nelle versioni 2.57.0 e successive. - Il gestore
VertexAIFeatureStoreEnrichmentHandler
è disponibile nell'SDK Apache Beam per Python 2.55.0 e versioni successive. - Quando utilizzi le versioni 2.55.0 e successive dell'SDK Apache Beam per Python, devi anche installare il client Python per Redis.
- I job Dataflow devono utilizzare Runner v2.
Utilizzare la trasformazione di arricchimento
Per utilizzare la trasformazione di arricchimento, includi il seguente codice nella pipeline:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Poiché la trasformazione di arricchimento esegue un join croce per impostazione predefinita, progetta il join personalizzato per arricchire i dati di input. Questo design garantisce che l'unione includa solo i campi specificati.
Nell'esempio seguente, left
è l'elemento di input della trasformazione di arricchimento e right
sono i dati recuperati da un servizio esterno per quell'elemento di input.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Parametri
Per utilizzare la trasformazione di arricchimento, è necessario il parametro EnrichmentHandler
.
Puoi anche utilizzare un parametro di configurazione per specificare una funzione lambda
per una funzione di join, un timeout, un regolatore o un ripetitore (strategia di ripetizione). Sono disponibili i seguenti parametri di configurazione:
join_fn
: una funzionelambda
che accetta dizionari come input e restituisce una riga arricchita (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
). La riga arricchita specifica come unire i dati recuperati dall'API. Il valore predefinito è un join croce.timeout
: il numero di secondi di attesa per il completamento della richiesta da parte dell'API prima del timeout. Il valore predefinito è 30 secondi.throttler
: specifica il meccanismo di limitazione. L'unica opzione supportata è la gestione della larghezza di banda adattiva lato client predefinita.repeater
: specifica la strategia di nuovo tentativo quando si verificano errori comeTooManyRequests
eTimeoutException
. Il valore predefinito èExponentialBackOffRepeater
.
Passaggi successivi
- Per altri esempi, consulta Trasformazione di arricchimento nel catalogo delle trasformazioni di Apache Beam.
- Utilizza Apache Beam e Bigtable per arricchire i dati.
- Utilizza Apache Beam e BigQuery per arricchire i dati.
- Utilizza Apache Beam e Vertex AI Feature Store per arricchire i dati.