本文說明如何從 Apache Kafka 讀取資料至 Dataflow,並提供效能提示和最佳做法。
在大多數情況下,建議使用受管理 I/O 連接器從 Kafka 讀取資料。
如需更進階的效能調整,請考慮使用 KafkaIO
連接器。KafkaIO
連接器適用於 Java,或透過 多語言管道架構用於 Python 和 Go。
平行處理工作數量
平行處理量受兩個因素限制:工作站數量上限 (max_num_workers
) 和 Kafka 分區數量。Dataflow 預設的平行處理扇出為 4 x max_num_workers
。不過,扇出會受限於分區數量。舉例來說,假設有 100 個 vCPU 可用,但管道只從 10 個 Kafka 分區讀取資料,則平行處理上限為 10。
為盡量提高平行處理量,建議至少要有 4 個 Kafka 分區。max_num_workers
如果作業使用 Runner v2,請考慮將平行處理設定得更高。建議您先將分區數量設為工作站 vCPU 數量的兩倍。
如果無法增加分區數量,可以呼叫 KafkaIO.Read.withRedistribute
來提高平行處理能力。這個方法會將 Redistribute
轉換新增至管道,為 Dataflow 提供提示,以便更有效率地重新分配資料並進行平行處理。強烈建議您呼叫 KafkaIO.Read.withRedistributeNumKeys
,指定最佳分片數量。單獨使用 KafkaIO.Read.withRedistribute
可能會產生大量金鑰,導致效能問題。詳情請參閱「找出平行處理程度高的階段」。重新分配資料會增加一些額外負擔,以執行重組步驟。詳情請參閱「防止融合」。
請盡量確保各個分割區之間的負載相對平均,不會出現偏差。如果負載不平均,可能會導致工作人員利用率不佳。從負載較輕的分區讀取資料的工作站可能相對閒置,而從負載較重的分區讀取資料的工作站可能落後。Dataflow 會提供每個分割區積壓工作的指標。
如果負載不平均,動態工作平衡功能有助於分配工作。舉例來說,Dataflow 可能會分配一個工作站從多個低容量分割區讀取資料,並分配另一個工作站從單一高容量分割區讀取資料。不過,兩個工作人員無法從同一個分割區讀取資料,因此負載過重的分割區仍可能導致管道落後。
最佳做法
本節提供從 Kafka 讀取資料至 Dataflow 的建議。
低搜尋量主題
常見情境是同時從多個低容量主題讀取資料,例如每個客戶一個主題。為每個主題建立個別的 Dataflow 工作,會造成成本效益不彰,因為每項工作至少需要一個完整的工作站。請改用下列選項:
合併主題:在主題擷取至 Dataflow 之前,請先合併主題。與其匯入大量低搜尋量的主題,不如匯入少量高搜尋量的主題,這樣效率會高得多。每個高用量主題都可以由單一 Dataflow 工作處理,充分運用工作站。
閱讀多個主題。如果無法在將主題擷取到 Dataflow 之前合併,建議您建立可從多個主題讀取資料的管道。這個方法可讓 Dataflow 將多個主題指派給同一個工作站。您可以透過下列兩種方式導入這項做法:
單一讀取步驟。建立
KafkaIO
連接器的單一執行個體,並設定為讀取多個主題。然後依主題名稱篩選,為每個主題套用不同邏輯。如需程式碼範例,請參閱「從多個主題讀取資料」。如果所有主題都位於同一個叢集,建議使用這個選項。缺點是單一接收器或轉換的問題可能會導致所有主題累積待處理項目。如要處理更進階的用途,請傳入一組
KafkaSourceDescriptor
物件,指定要讀取的項目。使用KafkaSourceDescriptor
可讓您日後視需要更新主題清單。這項功能需要搭配 Java 和 Runner v2。多個讀取步驟。如要從不同叢集中的主題讀取資料,管道可以包含多個
KafkaIO
執行個體。作業執行期間,您可以使用轉換對應更新個別來源。只有在使用 Runner v2 時,才能設定新主題或叢集。這種做法可能會造成可觀測性方面的問題,因為您必須監控每個個別的讀取轉換,而不是依賴管道層級的指標。
將資料提交回 Kafka
根據預設,KafkaIO
連接器不會使用 Kafka 偏移量追蹤進度,也不會回傳至 Kafka。如果您呼叫 commitOffsetsInFinalize
,連接器會在記錄於 Dataflow 中提交後,盡量提交回 Kafka。Dataflow 中已提交的記錄可能未完全處理,因此如果取消管道,系統可能會提交偏移量,但記錄從未完全處理。
由於設定 enable.auto.commit=True
會在從 Kafka 讀取偏移量後立即提交,而 Dataflow 不會進行任何處理,因此不建議使用這個選項。建議同時設定 enable.auto.commit=False
和 commitOffsetsInFinalize=True
。如果將 enable.auto.commit
設為 True
,管道在處理資料時若中斷,可能會導致資料遺失。Kafka 上已提交的記錄可能會遭到捨棄。
浮水印
根據預設,KafkaIO
連接器會使用目前的處理時間指派輸出浮水印和事件時間。如要變更這項行為,請呼叫 withTimestampPolicyFactory
並指派 TimestampPolicy
。Beam 提供 TimestampPolicy
的實作項目,可根據 Kafka 的記錄附加時間或訊息建立時間計算浮水印。
Runner 注意事項
KafkaIO
連接器有兩種 Kafka 讀取基礎實作方式:舊版 ReadFromKafkaViaUnbounded
和新版 ReadFromKafkaViaSDF
。Dataflow 會根據 SDK 語言和工作需求,自動為工作選擇最佳實作方式。除非需要特定功能,否則請避免明確要求執行器或 Kafka 實作項目。如要進一步瞭解如何選擇執行器,請參閱「使用 Dataflow Runner v2」。
如果管道使用 withTopic
或 withTopics
,舊版實作會在管道建構時查詢 Kafka,瞭解可用的分割區。建立管道的機器必須具備連線至 Kafka 的權限。如果收到權限錯誤訊息,請確認您有權在本機連線至 Kafka。如要避免這個問題,請使用 withTopicPartitions
,這個函式不會在管道建構期間連線至 Kafka。
部署至正式環境
在實際工作環境中部署解決方案時,建議使用彈性範本。使用 Flex 範本時,管道會從一致的環境啟動,有助於減輕本機設定問題。
KafkaIO
的記錄可能相當詳細。建議在正式版中降低記錄層級,如下所示:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
詳情請參閱「設定管道工作站記錄層級」。
設定網路
根據預設,Dataflow 會在預設的虛擬私有雲 (VPC) 網路中啟動執行個體。視 Kafka 設定而定,您可能需要為 Dataflow 設定不同的網路和子網路。詳情請參閱指定網路和子網路。設定網路時,請建立防火牆規則,允許 Dataflow 工作站機器連線至 Kafka 代理程式。
如果您使用 VPC Service Controls,請將 Kafka 叢集放在 VPC Service Controls 範圍內,否則請將範圍延伸至獲授權的 VPN 或 Cloud Interconnect。
如果 Kafka 叢集部署在 Google Cloud外部,您必須在 Dataflow 和 Kafka 叢集之間建立網路連線。我們提供多種網路選項,各有優缺點:
- 使用共用 RFC 1918 位址空間連線,方法如下:
- 透過公開 IP 位址存取外部代管的 Kafka 叢集,方法如下:
專屬互連是兼顧可預測效能和可靠性的最佳選項,但由於第三方廠商必須佈建新電路,因此可能要花費較長時間才能建置。使用公開 IP 型拓撲,您可以迅速開始使用,因為只需要很少的網路工作。
接下來兩節會詳細說明這些選項。
共用 RFC 1918 位址空間
專屬互連和 IPsec VPN 都能讓您直接存取虛擬私有雲 (VPC) 中的 RFC 1918 IP 位址,這樣可以簡化 Kafka 設定。如果您使用 VPN 型拓撲,請考慮建置高總處理量 VPN。
根據預設,Dataflow 會在預設虛擬私有雲網路上啟動執行個體。在一個具有在雲端路由器中明確定義的路徑,可讓 Google Cloud 中的子網路連接到該 Kafka 叢集的私人網路拓撲中,您需要對如何找到 Dataflow 執行個體擁有更多的控制權。您可以使用 Dataflow 設定 network
和 subnetwork
執行參數。
請確認當 Dataflow 嘗試擴展時,對應的子網路可以提供足夠的 IP 位址,以啟動執行個體。此外,當您建立獨立網路,以啟動 Dataflow 執行個體時,請確認您擁有的防火牆規則,可以啟動該專案中所有虛擬機器之間的 TCP 流量。預設網路已經設定了這樣的防火牆規則。
公開 IP 位址空間
這個架構使用傳輸層安全標準 (TLS) 保護外部用戶端和 Kafka 之間的流量,並使用未加密的流量進行代理器之間的通訊。當 Kafka 偵聽器繫結到用於內部和外部通訊的網路介面時,設定偵聽器就很簡單。然而在許多情況下,叢集內 Kafka 代理器的外部通告位址與 Kafka 使用的內部網路介面並不相同。在這種情況下,您可以使用 advertised.listeners
屬性:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
外部用戶端透過「SSL」管道使用 9093 通訊埠進行連線,內部用戶端則透過明文管道使用 9092 通訊埠進行連線。當您在 advertised.listeners
下指定位址時,請使用外部和內部流量都可以解析到同一執行個體的 DNS 名稱 (在此樣本中為 kafkabroker-n.mydomain.com
)。使用公開 IP 位址可能行不通,因為對於內部流量而言,該位址可能無法解析。
調整 Kafka
Kafka 叢集和 Kafka 用戶端設定可能會大幅影響效能。請特別注意,下列設定可能過低。本節提供一些建議的起點,但您應針對特定工作負載實驗這些值。
unboundedReaderMaxElements
。預設值為 10,000。較高的值 (例如 100,000) 會增加套件的大小,如果管道包含匯總作業,效能就會大幅提升。不過,值越高,延遲時間也可能越長。如要設定值,請使用setUnboundedReaderMaxElements
。這項設定不適用於 Runner v2。unboundedReaderMaxReadTimeMs
。預設值為 10,000 毫秒。較高的值 (例如 20,000 毫秒) 會增加套件大小,而較低的值 (例如 5000 毫秒) 則可減少延遲或積壓工作。如要設定值,請使用setUnboundedReaderMaxReadTimeMs
。這項設定不適用於 Runner V2。max.poll.records
。預設值為 500。如果一次擷取更多傳入記錄,價值較高的記錄可能會有較好的成效,尤其是在使用 Runner v2 時。如要設定值,請呼叫withConsumerConfigUpdates
。fetch.max.bytes
。預設為 1 MB。提高這個值可能會減少要求數量,進而提升總處理量,尤其是在使用 Runner v2 時。不過,如果設定過高,可能會增加延遲時間,但下游處理程序更有可能成為主要瓶頸。建議起始值為 100 MB。 如要設定值,請呼叫withConsumerConfigUpdates
。max.partition.fetch.bytes
。預設為 1 MB。這個參數會設定伺服器傳回的每個分割區資料量上限。增加值可減少要求數量,進而提高總處理量,尤其是在使用 Runner v2 時。不過,如果設定過高,可能會增加延遲時間,但下游處理程序更有可能成為主要瓶頸。建議的起始值為 100 MB。如要設定值,請呼叫withConsumerConfigUpdates
。consumerPollingTimeout
。預設值為 2 秒。如果消費者端在讀取任何記錄前逾時,請嘗試設定較高的值。執行跨區域讀取作業或透過緩慢的網路讀取時,這項設定最為重要。如要設定值,請呼叫withConsumerPollingTimeout
。
請確保 receive.buffer.bytes
足以處理訊息大小。如果值太小,記錄可能會顯示消費者持續重新建立,並尋找特定位移。
範例
下列程式碼範例說明如何建立從 Kafka 讀取的 Dataflow 管道。搭配使用應用程式預設憑證和 Google Cloud Managed Service for Apache Kafka 提供的回呼處理常式時,需要 kafka-clients
3.7.0 以上版本。
從單一主題讀取
本範例使用受管理 I/O 連接器。本教學課程說明如何從 Kafka 主題讀取資料,並將訊息酬載寫入文字檔。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
Python
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
從多個主題讀取資料
本範例使用 KafkaIO
連接器。這項範例會說明如何從多個 Kafka 主題讀取資料,並為每個主題套用個別的管道邏輯。
如要瞭解更進階的用途,請動態傳入一組 KafkaSourceDescriptor
物件,以便更新要讀取的主題清單。這個方法需要使用 Java 和 Runner v2。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
Python
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
後續步驟
- 寫入 Apache Kafka。
- 進一步瞭解受管理 I/O。