Apache Beam 提供可新增至管道的現成擴充轉換,簡化資料擴充工作流程。本頁說明如何使用 Apache Beam 的資料擴充轉換功能,擴充串流資料。
資料擴充功能會從一個來源取得原始資料,並從第二個來源加入相關資料,其他資料來源可以是 Bigtable 或 BigQuery 等。Apache Beam 擴充轉換會使用鍵/值查詢,將額外資料連結至原始資料。
以下列舉幾個適合使用資料擴充功能的案例:
- 您想建立電子商務管道,擷取網站或應用程式的使用者活動,並提供個人化建議。這項轉換作業會將活動併入管道資料,方便您提供個人化建議。
- 您有使用者資料,想與地理資料合併,以便進行地理位置分析。
- 您想建立管線,從傳送遙測事件的物聯網 (IoT) 裝置收集資料。
優點
充實轉換具有下列優點:
- 轉換資料,不必編寫複雜的程式碼或管理基礎程式庫。
- 提供內建來源處理常式。
- 使用
BigTableEnrichmentHandler
處理常式,透過 Bigtable 來源擴充資料,不必傳遞設定詳細資料。 - 使用
BigQueryEnrichmentHandler
處理常式,透過 BigQuery 來源擴充資料,不必傳遞設定詳細資料。 - 使用
VertexAIFeatureStoreEnrichmentHandler
處理常式搭配 Vertex AI 特徵儲存庫和 Bigtable 線上服務。
- 使用
- 使用用戶端節流機制管理要求頻率限制。要求會以指數輪詢方式重試,並採用預設的重試策略。您可以根據用途設定速率限制。
支援與限制
擴充轉換必須符合下列條件:
- 適用於批次和串流管道。
BigTableEnrichmentHandler
處理常式適用於 Apache Beam Python SDK 2.54.0 以上版本。BigQueryEnrichmentHandler
處理常式適用於 Apache Beam Python SDK 2.57.0 以上版本。VertexAIFeatureStoreEnrichmentHandler
處理常式適用於 Apache Beam Python SDK 2.55.0 以上版本。- 使用 Apache Beam Python SDK 2.55.0 以上版本時,您也需要安裝 Redis 的 Python 用戶端。
- Dataflow 工作必須使用 Runner v2。
使用擴充轉換
如要使用擴充轉換,請在管道中加入下列程式碼:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
由於擴充轉換作業預設會執行交叉聯結,請設計自訂聯結來擴充輸入資料。這項設計可確保聯結只包含指定的欄位。
在下列範例中,left
是擴充轉換的輸入元素,而 right
是從外部服務擷取的該輸入元素資料。
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
參數
如要使用擴充轉換,必須提供 EnrichmentHandler
參數。
您也可以使用設定參數,為聯結函式、逾時、節流器或中繼器 (重試策略) 指定 lambda
函式。可用的設定參數如下:
join_fn
:lambda
函式,會將字典做為輸入內容,並傳回經過擴充的資料列 (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
)。經過擴充的資料列會指定如何聯結從 API 擷取的資料。預設為交叉聯結。timeout
:API 在逾時前等待要求完成的秒數。預設值為 30 秒。throttler
:指定節流機制。目前僅支援預設的用戶端自適應節流。repeater
:指定發生TooManyRequests
和TimeoutException
等錯誤時的重試策略。預設值為ExponentialBackOffRepeater
。
後續步驟
- 如需更多範例,請參閱 Apache Beam 轉換目錄中的擴充轉換。
- 使用 Apache Beam 和 Bigtable 擴充資料。
- 使用 Apache Beam 和 BigQuery 充實資料。
- 使用 Apache Beam 和 Vertex AI 特徵儲存庫擴充資料。