開發及測試 Dataflow 管道

本頁提供開發及測試 Dataflow 管道的最佳做法。

總覽

管道程式碼的實作方式會對管道在實際工作環境中的效能產生重大影響。為協助您建立正確且有效率的管道程式碼,本文將說明下列事項:

  • 管道執行器,可在開發和部署的不同階段支援程式碼執行作業。
  • 部署環境,讓您在開發、測試、預先發布和正式發布期間執行管道。
  • 開放原始碼管道程式碼和範本,可直接使用,或做為新管道的基礎,加速程式碼開發。
  • 測試管道程式碼的最佳做法。首先,這份文件會提供總覽,包括不同測試類型 (例如單元測試、整合測試和端對端測試) 的範圍和關係。其次,我們會詳細探討每種測試,包括建立及整合測試資料的方法,以及每項測試要使用的管道執行器。

管道執行器

在開發和測試期間,您會使用不同的 Apache Beam 執行器來執行管道程式碼。Apache Beam SDK 提供直接執行器,方便您在本機開發及測試。發布自動化工具也可以使用 Direct Runner 進行單元測試和整合測試。舉例來說,您可以在持續整合 (CI) 管道中使用 Direct Runner。

部署至 Dataflow 的管道會使用 Dataflow Runner,在類似於實際工作環境的環境中執行管道。此外,您可以使用 Dataflow Runner 進行臨時開發測試,以及端對端管道測試。

雖然本頁著重於執行使用 Apache Beam Java SDK 建構的管道,但 Dataflow 也支援使用 Python 和 Go 開發的 Apache Beam 管道。Apache Beam Java、Python 和 Go SDK 已全面支援 Dataflow。SQL 開發人員也可以使用 Apache Beam SQL,建立使用熟悉 SQL 方言的管道。

設定部署環境

如要在不同開發階段區隔使用者、資料、程式碼和其他資源,請建立部署環境。如要為管道開發的不同階段提供獨立環境,請盡可能使用不同的Google Cloud 專案

以下各節說明典型的部署環境組合。

本機環境

本機環境是指開發人員的工作站。如要進行開發和快速測試,請使用 Direct Runner 在本機執行管道程式碼。

使用 Direct Runner 在本機執行的管道可以與遠端 Google Cloud 資源互動,例如 Pub/Sub 主題或 BigQuery 資料表。為每位開發人員提供個別專案,讓他們擁有沙箱,可使用服務進行臨時測試。Google Cloud Google Cloud

部分 Google Cloud 服務 (例如 Pub/SubBigtable) 提供本機開發模擬器。您可以搭配 Direct Runner 使用這些模擬器,進行端對端本機開發和測試。

沙箱環境

沙箱環境是 Google Cloud 專案,可讓開發人員在開發程式碼時存取 Google Cloud 服務。Pipeline 開發人員可以與其他開發人員共用 Google Cloud 專案,也可以使用自己的個別專案。使用個別專案可降低與共用資源用量和配額管理相關的規劃複雜度。

開發人員可使用沙箱環境,透過 Dataflow Runner 執行臨時管道。在程式碼開發階段,沙箱環境有助於針對正式版執行器偵錯及測試程式碼。舉例來說,開發人員可以透過臨時管道執行,完成下列工作:

  • 觀察程式碼變更對調度行為的影響。
  • 瞭解 Direct Runner 和 Dataflow Runner 行為的潛在差異。
  • 瞭解 Dataflow 如何套用圖形最佳化

如要進行臨時測試,開發人員可以從本機環境部署程式碼,以便在沙箱環境中執行 Dataflow。

前置製作環境

預先發布環境適用於需要在類似實際工作環境的條件下執行的開發階段,例如端對端測試。為前置製作環境使用獨立專案,並盡可能將其設定為與正式版相似。同樣地,如要以類似正式環境的規模進行端對端測試,請盡可能讓 Dataflow 和其他服務的專案配額與正式環境相近。 Google Cloud

您可以視需求將前置製作階段進一步劃分為多個環境。舉例來說,品質控管環境可支援品質分析師的工作,在不同工作負載條件下測試服務等級目標 (SLO),例如資料正確性、即時性和效能。

端對端測試 包括與測試範圍內資料來源和接收器的整合。 請考慮如何在前置製作環境中提供這些項目。您可以在前置製作環境中儲存測試資料。舉例來說,測試資料會與輸入資料一起儲存在 Cloud Storage 值區中。在其他情況下,測試資料可能來自前置製作環境外部,例如透過獨立訂閱項目 (位於實際執行環境中) 的 Pub/Sub 主題。如果是串流管道,您也可以使用產生的資料執行端對端測試,例如使用 Dataflow Streaming Data Generator 模擬類似於實際環境的資料特徵和量。

如果是串流管道,請先在試產環境中測試管道更新,再對正式環境進行任何變更。請務必測試及驗證串流管道的更新程序,特別是需要協調多個步驟時,例如執行平行管道來避免停機。

正式環境

實際工作環境是專屬 Google Cloud 專案。 所有端對端測試通過後,持續推送軟體更新會將部署構件複製到實際工作環境。

開發最佳做法

請參閱 Dataflow 管道最佳做法

測試管道

在軟體開發中,單元測試、整合測試和端對端測試是常見的軟體測試類型。這些測試類型也適用於資料管道。

Apache Beam SDK 提供啟用這些測試的功能。理想情況下,每種測試類型都會以不同的部署環境為目標。下圖說明單元測試、整合測試和端對端測試如何套用至管道和資料的不同部分。

測試類型,以及這些類型與轉換、管道、資料來源和資料接收器的關係。

下圖顯示不同測試的範圍,以及這些測試與轉換 (DoFnPTransform 子類別)、管道、資料來源和資料接收器的關係。

以下各節說明各種正式軟體測試如何套用至使用 Dataflow 的資料管道。閱讀本節時,請參閱圖表,瞭解不同類型的測試之間的關係。

資料取樣

如要觀察資料在 Dataflow 管道每個步驟中的狀態,請在測試期間啟用資料取樣功能。這樣您就能查看轉換的輸出內容,確保輸出內容正確無誤。

單元測試

單元測試會比較轉換的輸出內容與經過驗證的資料輸入和輸出內容集,評估 DoFn 子類別和複合轉換 (PTransform 子類別) 是否正常運作。一般而言,開發人員可以在本機環境中執行這些測試。您也可以在建構環境中,透過持續整合 (CI) 進行單元測試自動化,自動執行測試。

Direct Runner 會使用較小的參考測試資料子集執行單元測試,著重於測試轉換的業務邏輯。測試資料必須夠小,才能放入執行測試的電腦本機記憶體。

Apache Beam SDK 提供名為 TestPipeline 的 JUnit 規則,可供您單元測試個別轉換 (DoFn 子類別)、複合轉換 (PTransform 子類別) 和整個管道。您可以在 Apache Beam 管道執行器 (例如 Direct Runner 或 Dataflow Runner) 上使用 TestPipeline,透過 PAssertPCollection 物件的內容套用斷言,如以下 JUnit 測試類別的程式碼片段所示:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

個別轉換的單元測試

舉例來說,您可以將程式碼分解為可重複使用的轉換,例如頂層或靜態巢狀類別,針對管道的不同部分建立目標測試。除了測試的好處之外,可重複使用的轉換也會將管道的業務邏輯自然封裝到元件中,進而提升程式碼的可維護性和可重複使用性。相反地,如果管道使用匿名內部類別實作轉換,測試管道的個別部分可能會有困難。

下列 Java 程式碼片段顯示以匿名內部類別實作的轉換,這類轉換不容易測試。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

請比較上一個範例與下一個範例,其中匿名內部類別會重構為具名的具體 DoFn 子類別。您可以為構成端對端管道的每個具體 DoFn 子類別建立個別單元測試。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

測試每個 DoFn 子類別,與單元測試包含單一轉換的批次管道類似。使用 Create 轉換建立測試資料的 PCollection 物件,然後傳遞至 DoFn 物件。使用 PAssert 斷言 PCollection 物件的內容是否正確。下列 Java 程式碼範例使用 PAssert 類別檢查輸出格式是否正確。

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

整合測試

整合測試會驗證整個管道是否正常運作。請考慮下列整合測試類型:

  • 轉換整合測試:評估構成資料管道的所有個別轉換作業的整合功能。您可以將轉換整合測試視為整個管道的單元測試,但不包括與外部資料來源和接收器的整合。Apache Beam SDK 提供的方法可將測試資料提供給資料管道,並驗證處理結果。Direct Runner 用於執行轉換整合測試。
  • 系統整合測試,用於評估資料管道與即時資料來源和接收器的整合情形。如要讓管道與外部系統通訊,您需要使用適當的憑證設定測試,才能存取外部服務。串流管道會無限期執行,因此您需要決定何時及如何停止執行中的管道。使用 Direct Runner 執行系統整合測試,即可快速驗證管道與其他系統之間的整合,不必提交 Dataflow 工作並等待完成。

設計轉換和系統整合測試,以便快速偵測缺點並提供意見回饋,同時不影響開發人員的生產力。如果是長時間執行的測試 (例如以 Dataflow 工作形式執行的測試),您可能需要使用執行頻率較低的端對端測試。

您可以將資料管道視為一或多個相關轉換。您可以為管道建立封裝複合轉換,並使用 TestPipeline 執行整個管道的整合測試。視要以批次或串流模式測試管道而定,您可以使用 CreateTestStream 轉換提供測試資料。

使用測試資料進行整合測試

在實際工作環境中,您的管道可能會整合不同的資料來源和接收器。不過,對於單元測試和轉換整合測試,請著重於提供測試輸入內容並直接驗證輸出內容,藉此驗證管道程式碼的業務邏輯。除了簡化測試外,這種做法還能將管道專屬問題與資料來源和接收器可能導致的問題區隔開來。

測試批次管道

如果是批次管道,請使用 Create 轉換,從標準記憶體內集合 (例如 Java List 物件) 建立輸入測試資料的 PCollection 物件。如果測試資料夠小,可以納入程式碼,就適合使用 Create 轉換。然後,您可以使用輸出 PCollection 物件上的 PAssert,判斷管道程式碼是否正確。Direct Runner 和 Dataflow Runner 都支援這種做法。

下列 Java 程式碼片段會針對複合轉換的輸出 PCollection 物件進行判斷提示,其中包含構成管道的部分或所有個別轉換 (WeatherStatsPipeline)。這種做法與管道中個別轉換的單元測試類似。

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms 
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

如要測試視窗化行為,您也可以使用 Create 轉換來建立含有時間戳記的元素,如下列程式碼片段所示:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

測試串流管道

串流管道包含假設,用於定義如何處理無界資料。這些假設通常與實際情況下資料的即時性有關,因此假設是否成立會影響正確性。串流管道的整合測試最好包含模擬串流資料抵達時不確定性的測試。

如要啟用這類測試,Apache Beam SDK 提供 TestStream 類別,可模擬元素時間 (提早、準時或延遲的資料) 對資料管道結果的影響。搭配 PAssert 類別使用這些測試,即可驗證是否符合預期結果。

Direct Runner 和 Dataflow Runner 均支援 TestStream。下列程式碼範例會建立 TestStream 轉換:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

如要進一步瞭解 TestStream,請參閱「在 Apache Beam 中測試無界限管道」。如要進一步瞭解如何使用 Apache Beam SDK 進行單元測試,請參閱 Apache Beam 說明文件

在整合測試中使用 Google Cloud 服務

Direct Runner 可以與 Google Cloud 服務整合,因此本機環境中的臨時測試和系統整合測試,可以視需要使用 Pub/Sub、BigQuery 和其他服務。使用 Direct Runner 時,管道會使用應用程式預設憑證 (ADC) 取得憑證。設定 ADC 的方式取決於管道的執行位置。詳情請參閱「設定應用程式預設憑證」。

執行管道前,請務必為管道使用的帳戶授予任何必要資源的足夠權限。詳情請參閱「Dataflow 安全性與權限」。

如要進行完全在本機執行的整合測試,您可以針對部分Google Cloud 服務使用本機模擬器。您可以使用 Pub/SubBigtable 的本機模擬器。

如要對串流管道進行系統整合測試,可以使用 setBlockOnRun 方法 (在 DirectOptions 介面中定義),讓 Direct Runner 非同步執行管道。否則,管道執行作業會封鎖呼叫父項程序 (例如建構管道中的指令碼),直到管道手動停止為止。如果以非同步方式執行管道,可以使用傳回的 PipelineResult 執行個體取消管道執行作業,如下列程式碼範例所示:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

端對端測試

端對端測試會在與實際工作環境相似的條件下,於 Dataflow Runner 上執行端對端管道,驗證管道是否正常運作。這些測試會使用 Dataflow Runner 驗證業務邏輯函式是否正確,並測試管道在類似於實際環境的負載下是否能正常運作。您通常會在專屬 Google Cloud 專案中執行端對端測試,該專案會指定為前置製作環境。

如要測試不同規模的管道,請使用不同類型的端對端測試,例如:

  • 使用一小部分 (例如 1%) 的測試資料集執行小規模的端對端測試,在預先發布環境中快速驗證管道功能。
  • 使用完整測試資料集執行大規模端對端測試,在類似於實際運作的資料量和條件下,驗證管道功能。

如果是串流管道,建議您並行執行測試管道和正式管道 (如果兩者可以使用相同資料)。這個程序可讓您比較結果和作業行為,例如自動調整資源配置和效能。

端對端測試有助於預測管道是否能達到生產環境的服務等級目標。前置製作環境會模擬正式版環境,測試管道。在端對端測試中,管道會使用 Dataflow Runner 執行,處理與實際工作環境中資料集相符或極為相似的完整參照資料集。

您可能無法產生可準確模擬真實資料的合成資料,如要解決這個問題,其中一種方法是使用從生產資料來源擷取的清除資料,建立參照資料集,並透過適當的轉換作業,將所有私密資料去識別化。建議您使用敏感資料防護。Sensitive Data Protection 可從各種內容類型和資料來源偵測機密資料,並套用各種去識別化技術,包括遮蓋、遮蔽、格式保留加密和日期轉移。

批次和串流管道的端對端測試差異

在針對大型測試資料集執行完整的端對端測試之前,您可能想先使用一小部分測試資料 (例如 1%) 執行測試,並在較短的時間內驗證預期行為。與使用 Direct Runner 的整合測試相同,您可以在使用 Dataflow Runner 執行管道時,對 PCollection 物件使用 PAssert。如要進一步瞭解 PAssert,請參閱本頁的「單元測試」一節。

視用途而定,驗證端對端測試產生的大量輸出內容可能不切實際、成本高昂,或有其他困難。在這種情況下,您可以改為驗證輸出結果集中的代表性樣本。舉例來說,您可以使用 BigQuery 抽樣並比較輸出資料列與預期結果的參考資料集。

對於串流管道,使用合成資料模擬實際串流條件可能很困難。如要為端對端測試提供串流資料,常見做法是將測試與實際工作環境中的資料來源整合。如果您使用 Pub/Sub 做為資料來源,可以透過現有主題的其他訂閱項目,為端對端測試啟用獨立的資料串流。然後比較使用相同資料的不同管道結果,這有助於根據其他前置製作和製作管道驗證候選管道。

下圖顯示這個方法如何讓生產管道和測試管道在不同的部署環境中平行執行。

使用單一 Pub/Sub 串流來源,與正式版管道並行執行測試管道。

在圖表中,兩個管道都從同一個 Pub/Sub 主題讀取資料,但使用不同的訂閱項目。這樣一來,這兩個管道就能獨立處理相同資料,方便您比較結果。測試管道使用的服務帳戶與實際工作環境專案不同,因此不會使用實際工作環境專案的 Pub/Sub 訂閱者配額。

與批次管道不同,串流管道會持續執行,直到明確取消為止。在端對端測試中,您需要決定是否要讓管道繼續執行 (也許要等到下次執行端對端測試),或在代表測試完成的某個時間點取消管道,以便檢查結果。

您使用的測試資料類型會影響這項決策。舉例來說,如果您使用提供給串流管道的受限測試資料集,當所有元素都完成處理作業時,您可能會取消管道。或者,如果您使用實際資料來源 (例如用於正式環境的現有 Pub/Sub 主題),或持續產生測試資料,則可能需要讓測試管道長時間運作。後者可讓您比較行為與正式版環境,甚至是其他測試管道。