Tetap teratur dengan koleksi
Simpan dan kategorikan konten berdasarkan preferensi Anda.
Apache Beam menyederhanakan alur kerja pengayaan data dengan menyediakan transformasi pengayaan
siap pakai yang dapat Anda tambahkan ke pipeline. Halaman ini menjelaskan cara menggunakan transformasi pengayaan Apache Beam untuk memperkaya data streaming Anda.
Saat memperkaya data, Anda akan menambah data mentah dari satu sumber dengan menambahkan data terkait dari sumber kedua. Data tambahan dapat berasal dari berbagai
sumber, seperti Bigtable atau
BigQuery. Transformasi pengayaan Apache Beam menggunakan pencarian nilai kunci untuk menghubungkan data tambahan ke data mentah.
Contoh berikut memberikan beberapa kasus saat pengayaan data berguna:
Anda ingin membuat pipeline e-commerce yang merekam aktivitas pengguna dari situs atau aplikasi dan memberikan rekomendasi yang disesuaikan. Transformasi menggabungkan aktivitas ke dalam data pipeline sehingga Anda dapat memberikan rekomendasi yang disesuaikan.
Anda memiliki data pengguna yang ingin digabungkan dengan data geografis untuk melakukan analisis berbasis geografi.
Anda ingin membuat pipeline yang mengumpulkan data dari perangkat internet of things (IOT)
yang mengirim peristiwa telemetri.
Manfaat
Transformasi pengayaan memiliki manfaat berikut:
Mentransformasi data tanpa mengharuskan Anda menulis kode yang kompleks atau mengelola library yang mendasarinya.
Menyediakan pengendali sumber bawaan.
Gunakan pengendali BigTableEnrichmentHandler untuk memperkaya data Anda menggunakan sumber Bigtable tanpa meneruskan detail konfigurasi.
Gunakan pengendali BigQueryEnrichmentHandler untuk memperkaya data Anda menggunakan sumber BigQuery tanpa meneruskan detail konfigurasi.
Menggunakan throttling sisi klien untuk mengelola pembatasan kapasitas permintaan. Permintaan ditunda secara eksponensial dengan strategi percobaan ulang
default. Anda dapat mengonfigurasi pembatasan kapasitas agar sesuai dengan kasus penggunaan Anda.
Dukungan dan batasan
Transformasi pengayaan memiliki persyaratan berikut:
Tersedia untuk pipeline batch dan streaming.
Pengendali BigTableEnrichmentHandler tersedia di Apache Beam
Python SDK versi 2.54.0 dan yang lebih baru.
Pengendali BigQueryEnrichmentHandler tersedia di Apache Beam
Python SDK versi 2.57.0 dan yang lebih baru.
Pengendali VertexAIFeatureStoreEnrichmentHandler tersedia di Apache Beam
Python SDK versi 2.55.0 dan yang lebih baru.
Saat menggunakan Apache Beam Python SDK versi 2.55.0 dan yang lebih baru, Anda juga harus menginstal klien Python untuk Redis.
Untuk menggunakan transformasi pengayaan, sertakan kode berikut dalam
pipeline Anda:
importapache_beamasbeamfromapache_beam.transforms.enrichmentimportEnrichmentfromapache_beam.transforms.enrichment_handlers.bigtableimportBigTableEnrichmentHandlerbigtable_handler=BigTableEnrichmentHandler(...)withbeam.Pipeline()asp:output=(p...|"Create" >> beam.Create(data)|"Enrich with Bigtable" >> Enrichment(bigtable_handler)...)
Karena transformasi pengayaan melakukan cross join secara default, desain
gabungan kustom untuk memperkaya data input. Desain ini memastikan bahwa penggabungan hanya menyertakan
kolom yang ditentukan.
Dalam contoh berikut, left adalah elemen input transformasi pengayaan, dan right adalah data yang diambil dari layanan eksternal untuk elemen input tersebut.
Untuk menggunakan transformasi pengayaan, parameter EnrichmentHandler diperlukan.
Anda juga dapat menggunakan parameter konfigurasi untuk menentukan fungsi lambda untuk fungsi join, waktu tunggu, throttler, atau repeater (strategi percobaan ulang). Parameter konfigurasi
berikut tersedia:
join_fn: Fungsi lambda yang menggunakan kamus sebagai input dan menampilkan
baris yang diperkaya (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). Baris
yang diperkaya menentukan cara menggabungkan data yang diambil dari API.
Setelan defaultnya adalah cross join.
timeout: Jumlah detik yang harus ditunggu hingga permintaan selesai oleh
API sebelum waktu tunggu habis. Defaultnya adalah 30 detik.
throttler: Menentukan mekanisme throttling. Satu-satunya opsi yang didukung adalah
throttle adaptif sisi klien default.
repeater: Menentukan strategi percobaan ulang saat error seperti TooManyRequests
dan TimeoutException terjadi. Setelan defaultnya adalah ExponentialBackOffRepeater.
[[["Mudah dipahami","easyToUnderstand","thumb-up"],["Memecahkan masalah saya","solvedMyProblem","thumb-up"],["Lainnya","otherUp","thumb-up"]],[["Sulit dipahami","hardToUnderstand","thumb-down"],["Informasi atau kode contoh salah","incorrectInformationOrSampleCode","thumb-down"],["Informasi/contoh yang saya butuhkan tidak ada","missingTheInformationSamplesINeed","thumb-down"],["Masalah terjemahan","translationIssue","thumb-down"],["Lainnya","otherDown","thumb-down"]],["Terakhir diperbarui pada 2025-08-18 UTC."],[[["\u003cp\u003eApache Beam's enrichment transform simplifies data enrichment workflows by allowing users to augment raw data with related data from various sources like Bigtable or BigQuery.\u003c/p\u003e\n"],["\u003cp\u003eThe enrichment transform offers benefits such as transforming data without writing complex code, providing built-in source handlers for Bigtable, BigQuery, and Vertex AI Feature Store, and using client-side throttling for rate limiting.\u003c/p\u003e\n"],["\u003cp\u003eTo utilize the enrichment transform, users need to include specific code in their pipeline using \u003ccode\u003eBigTableEnrichmentHandler\u003c/code\u003e, and ensure they have the correct Apache Beam Python SDK versions, among other requirements.\u003c/p\u003e\n"],["\u003cp\u003eThe transform enables data enrichment for use cases such as creating ecommerce pipelines with customized recommendations, joining user data with geographical data for analytics, or gathering data from IoT devices.\u003c/p\u003e\n"],["\u003cp\u003eThe transform defaults to cross join but can be configured using a join function, timeout, throttler or repeater for greater control over how the data is enriched.\u003c/p\u003e\n"]]],[],null,["# Enrich streaming data\n\nApache Beam simplifies the data enrichment workflow by providing a turnkey\nenrichment transform that you can add to your pipeline. This page explains how\nto use the Apache Beam enrichment transform to enrich your streaming data.\n\nWhen you enrich data, you augment the raw data from one source by adding related\ndata from a second source. The additional data can come from a variety of\nsources, such as [Bigtable](/bigtable/docs/overview) or\n[BigQuery](/bigquery/docs/introduction). The Apache Beam enrichment\ntransform uses a key-value lookup to connect the additional data to the raw data.\n\nThe following examples provide some cases where data enrichment is useful:\n\n- You want to create an ecommerce pipeline that captures user activities from a website or app and provides customized recommendations. The transform incorporates the activities into your pipeline data so that you can provide the customized recommendations.\n- You have user data that you want to join with geographical data to do geography-based analytics.\n- You want to create a pipeline that gathers data from internet-of-things (IOT) devices that send out telemetry events.\n\nBenefits\n--------\n\nThe enrichment transform has the following benefits:\n\n- Transforms your data without requiring you to write complex code or manage underlying libraries.\n- Provides built-in source handlers.\n - Use the [`BigTableEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.BigTableEnrichmentHandler) handler to enrich your data by using a Bigtable source without passing configuration details.\n - Use the [`BigQueryEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigquery.html#apache_beam.transforms.enrichment_handlers.bigquery.BigQueryEnrichmentHandler) handler to enrich your data by using a BigQuery source without passing configuration details.\n - Use the [`VertexAIFeatureStoreEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.html#apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.VertexAIFeatureStoreEnrichmentHandler) handler with [Vertex AI Feature Store](/vertex-ai/docs/featurestore/latest/overview) and [Bigtable online serving](/vertex-ai/docs/featurestore/latest/overview#online_serving).\n- Uses client-side throttling to manage rate limiting the requests. The requests are exponentially backed off with a default retry strategy. You can configure rate limiting to suit your use case.\n\nSupport and limitations\n-----------------------\n\nThe enrichment transform has the following requirements:\n\n- Available for batch and streaming pipelines.\n- The `BigTableEnrichmentHandler` handler is available in the Apache Beam Python SDK versions 2.54.0 and later.\n- The `BigQueryEnrichmentHandler` handler is available in the Apache Beam Python SDK versions 2.57.0 and later.\n- The `VertexAIFeatureStoreEnrichmentHandler` handler is available in the Apache Beam Python SDK versions 2.55.0 and later.\n- When using the Apache Beam Python SDK versions 2.55.0 and later, you also need to install the [Python client for Redis](https://pypi.org/project/redis/).\n- Dataflow jobs must use [Runner v2](/dataflow/docs/runner-v2).\n\nUse the enrichment transform\n----------------------------\n\nTo use the enrichment transform, include the following code in\nyour pipeline: \n\n import apache_beam as beam\n from apache_beam.transforms.enrichment import Enrichment\n from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler\n\n bigtable_handler = BigTableEnrichmentHandler(...)\n\n with beam.Pipeline() as p:\n output = (p\n ...\n | \"Create\" \u003e\u003e beam.Create(data)\n | \"Enrich with Bigtable\" \u003e\u003e Enrichment(bigtable_handler)\n ...\n )\n\nBecause the enrichment transform performs a cross join by default, design the\ncustom join to enrich the input data. This design ensures that the join includes\nonly the specified fields.\n\nIn the following example, `left` is the input element of the enrichment\ntransform, and `right` is data fetched from an external service for that input\nelement. \n\n def custom_join(left: Dict[str, Any], right: Dict[str, Any]):\n enriched = {}\n enriched['\u003cvar translate=\"no\"\u003eFIELD_NAME\u003c/var\u003e'] = left['\u003cvar translate=\"no\"\u003eFIELD_NAME\u003c/var\u003e']\n ...\n return beam.Row(**enriched)\n\n### Parameters\n\nTo use the enrichment transform, the `EnrichmentHandler` parameter is required.\n\nYou can also use a configuration parameter to specify a `lambda` function for a join\nfunction, a timeout, a throttler, or a repeater (retry strategy). The following\nconfiguration parameters are available:\n\n- `join_fn`: A `lambda` function that takes dictionaries as input and returns an enriched row (`Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]`). The enriched row specifies how to join the data fetched from the API. Defaults to a cross join.\n- `timeout`: The number of seconds to wait for the request to be completed by the API before timing out. Defaults to 30 seconds.\n- `throttler`: Specifies the throttling mechanism. The only supported option is default client-side adaptive throttling.\n- `repeater`: Specifies the retry strategy when errors like `TooManyRequests` and `TimeoutException` occur. Defaults to `ExponentialBackOffRepeater`.\n\nWhat's next\n-----------\n\n- For more examples, see [Enrichment transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment) in the Apache Beam transform catalog.\n- [Use Apache Beam and Bigtable to enrich data](/dataflow/docs/notebooks/bigtable_enrichment_transform).\n- [Use Apache Beam and BigQuery to enrich data](/dataflow/docs/notebooks/bigquery_enrichment_transform).\n- [Use Apache Beam and Vertex AI Feature Store to enrich data](/dataflow/docs/notebooks/vertex_ai_feature_store_enrichment)."]]