本文說明如何將 Dataflow 的資料寫入 Apache Kafka。
在大多數情況下,建議使用受管理 I/O 連接器寫入 Kafka。
如需更進階的效能調整,請考慮使用 KafkaIO
連接器。KafkaIO
連接器適用於 Java,或透過 多語言管道架構用於 Python 和 Go。
僅需處理一次
根據預設,KafkaIO
連接器不會提供寫入作業的僅限一次語意。也就是說,資料可能會多次寫入 Kafka 主題。如要啟用「只寫入一次」功能,請呼叫 withEOS
方法。「只寫一次」保證資料只會寫入目的地 Kafka 主題一次。不過,這也會增加管道成本並降低總處理量。
如果您對「只傳送一次」語意沒有嚴格要求,且管道中的邏輯可以處理重複記錄,建議為整個管道啟用「至少傳送一次」模式,以降低費用。詳情請參閱「設定管道串流模式」。
管道排空
如果排空管道,系統就無法保證「僅限一次」語意。唯一保證的是不會遺失任何已確認的資料。因此,管道排空時可能會處理部分資料,但不會將讀取偏移量提交回 Kafka。如要在修改管道時,為 Kafka 達成至少一次的語意,請更新管道,而不是取消工作並啟動新工作。
調整 Kafka,確保「僅限一次」語意
調整 transaction.max.timeout.ms
和 transactional.id.expiration.ms
可輔助整體容錯和精確傳送一次策略。不過,影響程度取決於中斷性質和您的特定設定。將 transaction.max.timeout.ms
設為接近 Kafka 主題的保留時間,避免 Kafka 代理程式中斷導致資料重複。
如果 Kafka 代理程式暫時無法使用 (例如因網路分割或節點故障),且生產者有進行中的交易,這些交易可能會逾時。調高 transaction.max.timeout.ms
的值,可讓交易在代理程式復原後有更多時間完成,可能避免重新啟動交易和重新傳送訊息。這項緩解措施可減少因交易重新啟動而導致重複訊息的機率,間接協助維持「只傳送一次」的語意。另一方面,較短的到期時間有助於更快清除閒置的交易 ID,減少潛在的資源用量。
設定網路
根據預設,Dataflow 會在預設的虛擬私有雲 (VPC) 網路中啟動執行個體。視 Kafka 設定而定,您可能需要為 Dataflow 設定不同的網路和子網路。詳情請參閱指定網路和子網路。設定網路時,請建立防火牆規則,允許 Dataflow 工作站機器連線至 Kafka 代理程式。
如果您使用 VPC Service Controls,請將 Kafka 叢集放在 VPC Service Controls 範圍內,否則請將範圍延伸至獲授權的 VPN 或 Cloud Interconnect。
如果 Kafka 叢集部署在 Google Cloud外部,您必須在 Dataflow 和 Kafka 叢集之間建立網路連線。我們提供多種網路選項,各有優缺點:
- 使用共用 RFC 1918 位址空間連線,方法如下:
- 透過公開 IP 位址存取外部代管的 Kafka 叢集,方法如下:
專屬互連是兼顧可預測效能和可靠性的最佳選項,但由於第三方廠商必須佈建新電路,因此可能要花費較長時間才能建置。使用公開 IP 型拓撲,您可以迅速開始使用,因為只需要很少的網路工作。
接下來兩節會詳細說明這些選項。
共用 RFC 1918 位址空間
專屬互連和 IPsec VPN 都能讓您直接存取虛擬私有雲 (VPC) 中的 RFC 1918 IP 位址,這樣可以簡化 Kafka 設定。如果您使用 VPN 型拓撲,請考慮建置高總處理量 VPN。
根據預設,Dataflow 會在預設虛擬私有雲網路上啟動執行個體。在一個具有在雲端路由器中明確定義的路徑,可讓 Google Cloud 中的子網路連接到該 Kafka 叢集的私人網路拓撲中,您需要對如何找到 Dataflow 執行個體擁有更多的控制權。您可以使用 Dataflow 設定 network
和 subnetwork
執行參數。
請確認當 Dataflow 嘗試擴展時,對應的子網路可以提供足夠的 IP 位址,以啟動執行個體。此外,當您建立獨立網路,以啟動 Dataflow 執行個體時,請確認您擁有的防火牆規則,可以啟動該專案中所有虛擬機器之間的 TCP 流量。預設網路已經設定了這樣的防火牆規則。
公開 IP 位址空間
這個架構使用傳輸層安全標準 (TLS) 保護外部用戶端和 Kafka 之間的流量,並使用未加密的流量進行代理器之間的通訊。當 Kafka 偵聽器繫結到用於內部和外部通訊的網路介面時,設定偵聽器就很簡單。然而在許多情況下,叢集內 Kafka 代理器的外部通告位址與 Kafka 使用的內部網路介面並不相同。在這種情況下,您可以使用 advertised.listeners
屬性:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
外部用戶端透過「SSL」管道使用 9093 通訊埠進行連線,內部用戶端則透過明文管道使用 9092 通訊埠進行連線。當您在 advertised.listeners
下指定位址時,請使用外部和內部流量都可以解析到同一執行個體的 DNS 名稱 (在此樣本中為 kafkabroker-n.mydomain.com
)。使用公開 IP 位址可能行不通,因為對於內部流量而言,該位址可能無法解析。
記錄
KafkaIO
的記錄可能相當詳細。建議在正式版中降低記錄層級,如下所示:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
詳情請參閱「設定管道工作站記錄層級」。
後續步驟
- 從 Apache Kafka 讀取資料。
- 進一步瞭解受管理 I/O。