充實串流資料

Apache Beam 提供可新增至管道的現成擴充轉換,簡化資料擴充工作流程。本頁說明如何使用 Apache Beam 的資料擴充轉換功能,擴充串流資料。

資料擴充功能會從一個來源取得原始資料,並從第二個來源加入相關資料,其他資料來源可以是 BigtableBigQuery 等。Apache Beam 擴充轉換會使用鍵/值查詢,將額外資料連結至原始資料。

以下列舉幾個適合使用資料擴充功能的案例:

  • 您想建立電子商務管道,擷取網站或應用程式的使用者活動,並提供個人化建議。這項轉換作業會將活動併入管道資料,方便您提供個人化建議。
  • 您有使用者資料,想與地理資料合併,以便進行地理位置分析。
  • 您想建立管線,從傳送遙測事件的物聯網 (IoT) 裝置收集資料。

優點

充實轉換具有下列優點:

支援與限制

擴充轉換必須符合下列條件:

  • 適用於批次和串流管道。
  • BigTableEnrichmentHandler 處理常式適用於 Apache Beam Python SDK 2.54.0 以上版本。
  • BigQueryEnrichmentHandler 處理常式適用於 Apache Beam Python SDK 2.57.0 以上版本。
  • VertexAIFeatureStoreEnrichmentHandler 處理常式適用於 Apache Beam Python SDK 2.55.0 以上版本。
  • 使用 Apache Beam Python SDK 2.55.0 以上版本時,您也需要安裝 Redis 的 Python 用戶端
  • Dataflow 工作必須使用 Runner v2

使用擴充轉換

如要使用擴充轉換,請在管道中加入下列程式碼:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

由於擴充轉換作業預設會執行交叉聯結,請設計自訂聯結來擴充輸入資料。這項設計可確保聯結只包含指定的欄位。

在下列範例中,left 是擴充轉換的輸入元素,而 right 是從外部服務擷取的該輸入元素資料。

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

參數

如要使用擴充轉換,必須提供 EnrichmentHandler 參數。

您也可以使用設定參數,為聯結函式、逾時、節流器或中繼器 (重試策略) 指定 lambda 函式。可用的設定參數如下:

  • join_fnlambda 函式,會將字典做為輸入內容,並傳回經過擴充的資料列 (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row])。經過擴充的資料列會指定如何聯結從 API 擷取的資料。預設為交叉聯結。
  • timeout:API 在逾時前等待要求完成的秒數。預設值為 30 秒。
  • throttler:指定節流機制。目前僅支援預設的用戶端自適應節流。
  • repeater:指定發生 TooManyRequestsTimeoutException 等錯誤時的重試策略。預設值為 ExponentialBackOffRepeater

後續步驟