Dataflow pipeline 最佳做法

本頁面列出開發 Dataflow 管道時的最佳做法。採用這些最佳做法有以下優點:

  • 提升管道觀測能力和效能
  • 提高開發人員工作效率
  • 提升管道測試能力

本頁的 Apache Beam 程式碼範例使用 Java,但內容適用於 Apache Beam Java、Python 和 Go SDK。

值得思考的問題

設計管道時,請考慮下列問題:

  • 管道的輸入資料儲存在哪裡?您有多少組輸入資料?
  • 您的資料為何?
  • 您希望如何處理資料?
  • 管道的輸出資料應傳輸至何處?
  • 您的 Dataflow 工作是否使用 Assured Workloads

使用範本

為加快管道開發速度,請盡可能使用 Dataflow 範本,而非編寫 Apache Beam 程式碼來建構管道。範本具有下列優點:

  • 範本可重複使用。
  • 您可以變更特定管道參數,自訂每項工作。
  • 只要您授權,任何人都能使用範本部署管道。 舉例來說,開發人員可以從範本建立工作,而機構的資料科學家稍後可以部署該範本。

你可以使用 Google 提供的範本,也可以自行建立範本。部分 Google 提供的範本可讓您將自訂邏輯新增為管道步驟。舉例來說,Pub/Sub 到 BigQuery 範本提供參數,可執行儲存在 Cloud Storage 中的 JavaScript 使用者定義函式 (UDF)。

由於 Google 提供的範本是依據 Apache 授權 2.0 版開放原始碼,因此您可以將這些範本做為新管道的基礎。範本也可用於程式碼範例。在 GitHub 存放區中查看範本程式碼。

Assured Workloads

Assured Workloads 可協助Google Cloud 客戶強制執行安全性與法規遵循要求。舉例來說,「具備主權控管機制的歐盟區域與支援服務」可協助確保歐盟客戶的資料落地權和資料主權。為提供這些功能,部分 Dataflow 功能會受到限制。如果您搭配使用 Assured Workloads 和 Dataflow,管道存取的所有資源都必須位於貴機構的 Assured Workloads 專案或資料夾中。這些資源包括:

  • Cloud Storage 值區
  • BigQuery 資料集
  • Pub/Sub 主題和訂閱項目
  • Firestore 資料集
  • I/O 連接器

在 Dataflow 中,如果是 2024 年 3 月 7 日後建立的串流工作,所有使用者資料都會以 CMEK 加密。

如果是 2024 年 3 月 7 日前建立的串流作業,以金鑰為基礎的作業 (如時間區間設定、分組和彙整) 中使用的資料金鑰不受 CMEK 加密保護。如要為工作啟用這項加密功能,請排除或取消工作,然後重新啟動工作。詳情請參閱「管道狀態構件加密」。

在管道之間共用資料

我們並未提供 Dataflow 專屬的跨管道通訊機制,讓您在管道之間共用資料或處理背景資訊。您可以使用 Cloud Storage 等耐用儲存空間或 App Engine 等記憶體內快取,在管道執行個體之間共用資料。

排定工作

您可以透過下列方式自動執行管道:

撰寫管道程式碼的最佳做法

下列各節提供最佳做法,協助您編寫 Apache Beam 程式碼來建立管道。

建構 Apache Beam 程式碼

如要建立管道,一般會使用通用的 ParDo 平行處理 Apache Beam 轉換。套用 ParDo 轉換時,您會以 DoFn 物件的形式提供程式碼。DoFn 是 Apache Beam SDK 類別,用於定義分散式處理函式。

您可以將 DoFn 程式碼想像成小型的獨立實體:多個執行個體可能會在不同的機器上執行,且彼此之間互不相干。因此,建議您建立「純虛擬函式」,這類函式非常適合 DoFn 元素的平行處理及分散特質。純函式具有下列特性:

  • 純函式不依附於隱藏或外部狀態。
  • 沒有可觀察到的副作用。
  • 確定性。

純函式模型並不嚴謹。只要程式碼不依賴於 Dataflow 服務無法保證的事物,狀態資訊或外部初始化資料對 DoFn 及其他函式物件也是有效的。

結構化 ParDo 轉換及建立 DoFn 元素時,請參考下列指南:

  • 使用一次性處理時,Dataflow 服務保證輸入 PCollection 中的每個元素都只會由 DoFn 執行個體「處理一次」
  • Dataflow 服務不保證 DoFn 的叫用次數。
  • Dataflow 服務不保證分布式元素的確切分組方式,無法保證系統會一起處理哪些元素。
  • Dataflow 服務不保證在整個管道過程中會確切建立多少個 DoFn 執行個體。
  • Dataflow 服務具有容錯功能,因此工作站發生問題時,可能會多次重試您的程式碼。
  • Dataflow 服務可能會為您的程式碼建立備份副本。手動作業可能會產生連帶影響,例如程式碼依賴於暫存檔案或建立的暫存檔案使用了重複名稱。
  • Dataflow 服務會以序列化的方式處理各 DoFn 執行個體的元素。您的程式碼不一定要是嚴謹的安全執行緒,但多個 DoFn 執行個體之間共用的狀態必須是執行緒安全的。

建立可重複使用的轉換作業程式庫

Apache Beam 程式設計模型可讓您重複使用轉換。建立通用轉換的共用程式庫,可讓不同團隊提高程式碼的重複使用率、可測試性及擁有權。

請參考下列兩個 Java 程式碼範例,兩者都會讀取付款事件。假設兩個管道執行的處理作業相同,則可透過共用程式庫,對其餘處理步驟使用相同的轉換。

第一個範例來自不受限的 Pub/Sub 來源:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments =
    p.apply("Read from topic",
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

第二個範例來自有界關聯資料庫來源:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments =
    p.apply(
        "Read from database table",
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

實作程式碼重複使用最佳做法的方式,會因程式設計語言和建構工具而異。舉例來說,如果您使用 Maven,可以將轉換程式碼分成自己的模組。接著,您可以在較大的多模組專案中,將模組納入為不同管道的子模組,如以下程式碼範例所示:

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

詳情請參閱下列 Apache Beam 說明文件頁面:

使用無法傳送訊息的佇列處理錯誤

有時管道無法處理元素。資料問題是常見原因。 舉例來說,如果元素包含格式錯誤的 JSON,可能會導致剖析失敗。

雖然您可以在 DoFn.ProcessElement 方法中擷取例外狀況、記錄錯誤並捨棄元素,但這種做法會遺失資料,且無法在稍後檢查資料以進行手動處理或疑難排解。

請改用稱為死信佇列 (未處理訊息佇列) 的模式。 在 DoFn.ProcessElement 方法中擷取例外狀況,並記錄錯誤。請勿捨棄失敗的元素,而是使用分支輸出將失敗的元素寫入個別的 PCollection 物件。這些元素隨後會寫入資料接收器,以便稍後使用個別轉換檢查及處理。

下列 Java 程式碼範例說明如何實作無法傳送郵件的佇列模式。

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead-letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

使用 Cloud Monitoring,為管道的無效信件佇列套用不同的監控和快訊政策。舉例來說,您可以將死信轉換處理的元素數量和大小視覺化,並設定警報,在達到特定閾值條件時觸發警報。

處理結構定義變異

如要處理具有非預期但有效結構定義的資料,可以使用死信模式,將失敗的元素寫入個別的 PCollection 物件。在某些情況下,您希望自動將反映變異結構定義的元素視為有效元素。舉例來說,如果元素的結構定義反映了異動 (例如新增欄位),您可以調整資料接收器的結構定義,以配合異動。

自動結構定義變動功能會採用死信模式使用的分支輸出方法。不過,在這種情況下,每當遇到加法結構定義時,就會觸發轉換,進而改變目的地結構定義。如需這個方法的範例,請參閱這篇 Google Cloud 網誌文章,瞭解 Square Enix 如何在串流管道中處理變動的 JSON 結構定義。

決定如何合併資料集

彙整資料集是資料管道的常見用途。您可以使用側邊輸入或 CoGroupByKey 轉換,在管道中執行聯結。各有優缺點。

側邊輸入提供彈性的方式,解決常見的資料處理問題,例如資料擴充和鍵值查詢。與 PCollection 物件不同,側邊輸入內容可變動,且可在執行階段判斷。舉例來說,側邊輸入中的值可能由管道中的另一個分支版本計算,或透過呼叫遠端服務來判斷。

Dataflow 會將資料保存到永久儲存空間 (類似共用磁碟),藉此支援輔助輸入。這項設定會讓所有工作站都能存取完整的側邊輸入。

不過,側邊輸入的大小可能非常大,不適合工作站記憶體。如果工作站需要不斷從永久儲存空間讀取資料,從大型側邊輸入讀取資料可能會導致效能問題。

CoGroupByKey 轉換是核心 Apache Beam 轉換,可合併 (扁平化) 多個 PCollection 物件,並將具有相同鍵的元素分組。側向輸入會將整個側向輸入資料提供給每個工作站,但 CoGroupByKey 會執行隨機排列 (分組) 作業,將資料分配給各個工作站。因此,當您要聯結的 PCollection 物件非常大,且不適合工作站記憶體時,CoGroupByKey 就非常適合。

請按照以下準則,判斷是否要使用側邊輸入或 CoGroupByKey

  • 當您要彙整的 PCollection 物件中,有一個物件明顯小於其他物件,且較小的 PCollection 物件符合工作人員記憶體大小時,請使用側邊輸入。將側邊輸入完全快取到記憶體中,可快速有效率地擷取元素。
  • 如果 PCollection 物件必須在管道中多次聯結,請使用側邊輸入。不必使用多個 CoGroupByKey 轉換,只要建立單一側邊輸入,就能供多個 ParDo 轉換重複使用。
  • 如果需要擷取 PCollection 物件的大部分內容,且內容遠遠超出工作站記憶體容量,請使用 CoGroupByKey

詳情請參閱「排解 Dataflow 記憶體不足錯誤」。

盡量減少昂貴的元素作業

DoFn 執行個體會處理稱為「套件」的元素批次,這些是不可分割的工作單元,由零或多個元素組成。接著,系統會透過 DoFn.ProcessElement 方法處理個別元素,這個方法會針對每個元素執行。由於系統會為每個元素呼叫 DoFn.ProcessElement 方法,因此該方法叫用的任何耗時或運算成本高昂的作業,都會針對方法處理的每個元素執行。

如果只需要為一批元素執行一次耗用大量資源的作業,請將這些作業納入 DoFn.Setup 方法或 DoFn.StartBundle 方法,而不是 DoFn.ProcessElement 元素。例如:

  • 剖析設定檔,控管執行個體行為的某些層面。DoFn初始化 DoFn 執行個體時,請使用 DoFn.Setup 方法,只叫用這項動作一次。

  • 在套裝組合中的所有元素之間重複使用短期存續期用戶端,例如透過單一網路連線傳送套裝組合中的所有元素。使用 DoFn.StartBundle 方法,為每個套件組合叫用這項動作一次。

限制批次大小和對外部服務的並行呼叫

呼叫外部服務時,您可以使用 GroupIntoBatches 轉換,減少每次呼叫的額外負荷。這項轉換會建立指定大小的元素批次。 批次處理會將元素以單一酬載的形式傳送至外部服務,而非個別傳送。

搭配批次處理功能,選擇適當的金鑰來分割傳入資料,藉此限制對外部服務的平行 (並行) 呼叫次數上限。分區數量決定了平行處理的最大程度。舉例來說,如果每個元素都使用相同的鍵,則呼叫外部服務的下游轉換不會平行執行。

請考慮採用下列其中一種方法,為元素產生鍵:

  • 選擇要當做資料鍵的資料集屬性,例如使用者 ID。
  • 產生資料鍵,將元素隨機分割到固定數量的分區中,可能鍵值的數量決定分區數量。您需要建立足夠的分區,才能進行平行處理。 每個分區都必須有足夠的元素,GroupIntoBatches 轉換才能發揮作用。

下列 Java 程式碼範例說明如何將元素隨機分割到十個分割區:

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

找出因融合步驟而導致的效能問題

Dataflow 會根據您用來建構管道的轉換和資料,建立代表管道的步驟圖。這張圖稱為「管道執行圖」

部署管道時,Dataflow 可能會修改管道的執行圖,以提升效能。舉例來說,Dataflow 可能會將某些作業合併在一起 (這個程序稱為融合最佳化),避免在管道中寫入每個中繼 PCollection 物件,進而影響效能和成本。

有時候,Dataflow 可能無法準確判斷融合管道作業的最佳方式,而使得工作無法充分運用所有可用的工作站。在這種情況下,您可以防止作業融合。

請參考以下 Apache Beam 程式碼範例。GenerateSequence 轉換會建立小型有界 PCollection 物件,然後由兩個下游 ParDo 轉換進一步處理。

Find Primes Less-than-N 轉換的運算成本可能很高,因此處理大量數字時可能會很慢。相較之下,Increment Number 轉換可能很快就會完成。

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number",
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

下圖顯示 Dataflow 監控介面中的管道圖示。

Dataflow 介面中的管道流程表示法。

Dataflow 監控介面顯示,這兩個轉換作業的處理速度同樣緩慢,具體來說是每秒 13 個元素。您可能會預期 Increment Number 轉換會快速處理元素,但實際上,這項轉換的處理速率似乎與 Find Primes Less-than-N 相同。

這是因為 Dataflow 將這些步驟融合為單一階段,因此無法獨立執行。您可以使用 gcloud dataflow jobs describe 指令尋找更多資訊:

gcloud dataflow jobs describe --full job-id --format json

在產生的輸出內容中,融合的步驟會說明於 ComponentTransform 陣列的 ExecutionStageSummary 物件中:

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

在這種情況下,由於 Find Primes Less-than-N 轉換是緩慢的步驟,因此在該步驟之前中斷融合是適當的策略。如要取消融合步驟,其中一種方法是在步驟前插入 GroupByKey 轉換並取消分組,如以下 Java 程式碼範例所示。

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

您也可以將這些取消融合步驟合併為可重複使用的複合轉換。

取消融合步驟後,當您執行管道時,Increment Number會在幾秒內完成,而執行時間較長的 Find Primes Less-than-N 轉換會在另一個階段執行。

這個範例會對未合併的步驟套用群組和取消群組作業。 在其他情況下,您可以使用其他方法。在本例中,由於 GenerateSequence 轉換的連續輸出,處理重複輸出並非問題。KV 具有重複鍵的物件會在群組 (GroupByKey) 轉換中重複資料刪除為單一鍵,並在取消分組 (Keys) 轉換中重複資料刪除。如要在群組和取消群組作業後保留重複項目,請按照下列步驟建立鍵/值組合:

  1. 使用隨機金鑰,並將原始輸入內容做為值。
  2. 使用隨機金鑰分組。
  3. 針對每個鍵發出值做為輸出。

您也可以使用 Reshuffle 轉換,避免周圍的轉換融合。不過,Reshuffle轉換的副作用無法在不同 Apache Beam 執行器之間移植。

如要進一步瞭解平行處理和融合最佳化,請參閱「管道生命週期」。

使用 Apache Beam 指標收集管道洞察資料

Apache Beam 指標是一種公用程式類別,可產生指標來回報執行中管道的屬性。使用 Cloud Monitoring 時,Apache Beam 指標會以 Cloud Monitoring 自訂指標的形式提供。

下列範例顯示 DoFn 子類別中使用的 Apache Beam Counter 指標

範例程式碼使用兩個計數器。一個計數器會追蹤 JSON 剖析失敗次數 (malformedCounter),另一個計數器則會追蹤 JSON 訊息是否有效,但包含空白的酬載 (emptyCounter)。在 Cloud Monitoring 中,自訂指標名稱為 custom.googleapis.com/dataflow/malformedJsoncustom.googleapis.com/dataflow/emptyPayload。您可以在 Cloud Monitoring 中使用自訂指標建立視覺化內容和快訊政策。

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead-letter queue
      c.output(errorTag, c.element());
    }
  }
}

瞭解詳情

以下頁面提供更多資訊,說明如何建構管道、如何選擇要套用至資料的轉換作業,以及選擇管道的輸入和輸出方法時應考量的因素。

如要進一步瞭解如何建構使用者程式碼,請參閱使用者提供函式相關規定