Apache Beam simplifica el flujo de trabajo de enriquecimiento de datos proporcionando una transformación de enriquecimiento lista para usar que puedes añadir a tu flujo de procesamiento. En esta página se explica cómo usar la transformación de enriquecimiento de Apache Beam para enriquecer tus datos de streaming.
Cuando enriquece datos, aumenta los datos brutos de una fuente añadiendo datos relacionados de una segunda fuente. Los datos adicionales pueden proceder de diversas fuentes, como Bigtable o BigQuery. La transformación de enriquecimiento de Apache Beam usa una búsqueda de clave-valor para conectar los datos adicionales con los datos sin procesar.
En los siguientes ejemplos se muestran algunos casos en los que el enriquecimiento de datos es útil:
- Quieres crear una canalización de comercio electrónico que registre las actividades de los usuarios de un sitio web o una aplicación y proporcione recomendaciones personalizadas. La transformación incorpora las actividades a los datos de tu canalización para que puedas proporcionar las recomendaciones personalizadas.
- Tiene datos de usuario que quiere combinar con datos geográficos para hacer análisis basados en la geografía.
- Quieres crear una canalización que recoja datos de dispositivos de Internet de las Cosas (IoT) que envían eventos de telemetría.
Ventajas
La transformación de enriquecimiento tiene las siguientes ventajas:
- Transforma los datos sin que tengas que escribir código complejo ni gestionar bibliotecas subyacentes.
- Proporciona controladores de origen integrados.
- Usa el controlador
BigTableEnrichmentHandler
para enriquecer tus datos con una fuente de Bigtable sin tener que proporcionar detalles de configuración. - Usa el controlador
BigQueryEnrichmentHandler
para enriquecer tus datos mediante una fuente de BigQuery sin tener que proporcionar detalles de configuración. - Usa el controlador
VertexAIFeatureStoreEnrichmentHandler
con Vertex AI Feature Store y Bigtable online serving.
- Usa el controlador
- Usa la limitación del lado del cliente para gestionar la limitación de frecuencia de las solicitudes. Las solicitudes se reintentan con un tiempo de espera exponencial y una estrategia de reintento predeterminada. Puedes configurar la limitación de frecuencia para adaptarla a tu caso práctico.
Compatibilidad y limitaciones
La transformación de enriquecimiento tiene los siguientes requisitos:
- Disponible para flujos de procesamiento por lotes y de streaming.
- El controlador
BigTableEnrichmentHandler
está disponible en las versiones 2.54.0 y posteriores del SDK de Apache Beam para Python. - El controlador
BigQueryEnrichmentHandler
está disponible en las versiones 2.57.0 y posteriores del SDK de Apache Beam para Python. - El controlador
VertexAIFeatureStoreEnrichmentHandler
está disponible en las versiones 2.55.0 y posteriores del SDK de Apache Beam para Python. - Si usas las versiones 2.55.0 y posteriores del SDK de Apache Beam para Python, también debes instalar el cliente de Python para Redis.
- Las tareas de Dataflow deben usar Runner v2.
Usar la transformación de enriquecimiento
Para usar la transformación de enriquecimiento, incluye el siguiente código en tu canalización:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Como la transformación de enriquecimiento realiza una unión cruzada de forma predeterminada, diseña la unión personalizada para enriquecer los datos de entrada. Este diseño asegura que la combinación solo incluya los campos especificados.
En el siguiente ejemplo, left
es el elemento de entrada de la transformación de enriquecimiento y right
son los datos obtenidos de un servicio externo para ese elemento de entrada.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Parámetros
Para usar la transformación de enriquecimiento, es obligatorio incluir el parámetro EnrichmentHandler
.
También puedes usar un parámetro de configuración para especificar una función lambda
para una función de unión, un tiempo de espera, un limitador o un repetidor (estrategia de reintento). Están disponibles los siguientes parámetros de configuración:
join_fn
: funciónlambda
que toma diccionarios como entrada y devuelve una fila enriquecida (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
). La fila enriquecida especifica cómo combinar los datos obtenidos de la API. El valor predeterminado es una combinación cruzada.timeout
: número de segundos que debe esperar la API a que se complete la solicitud antes de que se agote el tiempo de espera. El valor predeterminado es 30 segundos.throttler
: especifica el mecanismo de limitación. La única opción admitida es la limitación adaptativa predeterminada del lado del cliente.repeater
: especifica la estrategia de reintento cuando se producen errores comoTooManyRequests
yTimeoutException
. El valor predeterminado esExponentialBackOffRepeater
.
Siguientes pasos
- Para ver más ejemplos, consulta la sección Transformación de enriquecimiento del catálogo de transformaciones de Apache Beam.
- Usa Apache Beam y Bigtable para enriquecer datos.
- Usa Apache Beam y BigQuery para enriquecer datos.
- Usa Apache Beam y Vertex AI Feature Store para enriquecer datos.