建立傳統 Dataflow 範本

本文將說明如何從 Dataflow 管道程式碼建立自訂傳統範本。 傳統範本會封裝現有的 Dataflow 管道,建立可重複使用的範本,您只要變更特定管道參數,就能為每項工作自訂範本。您不必編寫範本,而是使用指令從現有管道產生範本。

以下是大致流程。後續章節會詳細說明這個程序。

  1. 在管道程式碼中,針對您要在執行階段設定或使用的所有管道選項,請使用 ValueProvider 介面。使用可接受執行階段參數的 DoFn 物件。
  2. 使用額外的中繼資料擴充範本,這樣執行傳統範本時,就能驗證自訂參數。這類中繼資料的範例包括自訂傳統範本的名稱和選用參數。
  3. 檢查管道 I/O 連接器是否支援 ValueProvider 物件,並視需要進行變更。
  4. 建立並暫存自訂傳統範本。
  5. 執行自訂傳統範本。

如要瞭解不同類型的 Dataflow 範本、優點,以及何時該選擇傳統範本,請參閱「Dataflow 範本」。

執行傳統範本所需的權限

執行 Dataflow 傳統範本所需的權限,取決於您執行範本的位置,以及管道的來源和接收器是否位於其他專案中。

如要進一步瞭解如何在本機或使用 Google Cloud執行 Dataflow 管道,請參閱「Dataflow 安全性與權限」。

如需 Dataflow 角色和權限清單,請參閱「Dataflow 存取權控管」。

限制

  • 傳統範本不支援下列管道選項。如要控管工作站安全帶執行緒的數量,請使用 Flex 範本

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • Dataflow 執行器不支援 Pub/Sub 主題和訂閱參數的 ValueProvider 選項。如果執行階段參數需要 Pub/Sub 選項,請使用 Flex 範本。

關於執行階段參數和 ValueProvider 介面

ValueProvider 介面可讓管道接受執行階段參數。Apache Beam 提供三種類型的 ValueProvider 物件。

名稱 說明
RuntimeValueProvider

RuntimeValueProvider 是預設的 ValueProvider 類型。 RuntimeValueProvider 可讓管道接受僅在管道執行期間可用的值。該值在管道建構期間不提供,因此您無法使用該值變更管道的工作流程圖。

您可以使用 isAccessible() 查看 ValueProvider 的值是否可用。如果您在管道執行前就先呼叫 get(),Apache Beam 會傳回錯誤:
Value only available at runtime, but accessed from a non-runtime context.

若您事先不知道該值,請使用 RuntimeValueProvider。如要在執行階段變更參數值,請勿在範本中設定參數值。 從範本建立作業時,請設定參數值。

StaticValueProvider

StaticValueProvider 可讓您提供靜態值給管道。該值在管道建構期間可用,因此您可以使用該值變更管道的工作流程圖。

若您事先知道該值,請使用 StaticValueProvider。範例請參閱 StaticValueProvider 一節

NestedValueProvider

NestedValueProvider 可讓您計算另一個 ValueProvider 物件的值。NestedValueProvider 會包含 ValueProvider,而包含的 ValueProvider 的類型可決定該值在管道建構期間是否可供存取。

若您在執行階段期間想使用該值計算另一個值,請使用 NestedValueProvider。範例請參閱 NestedValueProvider 一節

在管道程式碼中使用執行階段參數

本節逐步說明如何使用 ValueProviderStaticValueProviderNestedValueProvider

在管道選項中使用 ValueProvider

針對您想在執行階段設定或使用的所有管道選項,請使用 ValueProvider

舉例來說,下列 WordCount 程式碼片段不支援執行階段參數。該程式碼會新增一個輸入檔案選項,建立管道,並從輸入檔案讀取資料行:

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

如要新增執行階段參數支援,請將輸入檔案選項修改成使用 ValueProvider

Java

對於輸入檔案選項的類型,請使用 ValueProvider<String>,而不要使用 String

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

add_argument 替換為 add_value_provider_argument

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

在函式中使用 ValueProvider

如要在您自己的函式中使用執行階段參數值,請將函式更新為使用 ValueProvider 參數。

下列範例含有一個整數 ValueProvider 選項,以及一個可處理整數加法的簡易函式。該函式須依賴 ValueProvider 整數。在執行期間,管道會將 MySumFn 套用至含有 [1, 2, 3]PCollection 裡的每個整數。如果執行階段值是 10,則產生的 PCollection 會含有 [11, 12, 13]

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

使用StaticValueProvider

如要將靜態值提供給管道,請使用 StaticValueProvider

本範例使用 MySumFn,這是會取用 ValueProvider<Integer>DoFn。如果您事先就知道參數的值,就可以使用 StaticValueProvider,將靜態值指定為 ValueProvider

Java

此程式碼會在管道執行階段取用值:

  .apply(ParDo.of(new MySumFn(options.getInt())))

您可以改為使用 StaticValueProvider 搭配靜態值:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

此程式碼會在管道執行階段取用值:

  beam.ParDo(MySumFn(user_options.templated_int))

您可以改為使用 StaticValueProvider 搭配靜態值:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

若您實作的 I/O 模組同時支援正則參數和執行階段參數,也可以使用 StaticValueProviderStaticValueProvider 會減少因採用兩種類似方法而導致程式碼重複的情況。

Java

本範例的原始碼來自於 GitHub 上的 Apache Beam TextIO.java

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

在本範例中,有單一建構函式可接受 stringValueProvider 引數。如果引數是 string,就會轉換成 StaticValueProvider

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

使用NestedStaticValueProvider

如要計算另一個 ValueProvider 物件的值,請使用 NestedValueProvider

NestedValueProvider 會取用 ValueProviderSerializableFunction 轉譯器做為輸入。當您在 NestedValueProvider 呼叫 .get(),轉譯器會根據 ValueProvider 值建立新的值。此轉譯器可讓您使用 ValueProvider 值建立您想要的最終值。

在以下範例中,使用者提供檔案名稱 file.txt。轉換作業會將路徑 gs://directory_name/ 加在檔案名稱的前面。呼叫 .get() 會傳回 gs://directory_name/file.txt

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

在管道程式碼中使用中繼資料

您可以用額外的中繼資料來擴充範本,如此一來,範本執行時,就能驗證自訂參數。如要為範本建立中繼資料,請按照下列步驟操作:

  1. 使用「中繼資料參數」中的參數,以及「中繼資料檔案範例」中的格式,建立名為 TEMPLATE_NAME_metadata 的 JSON 格式檔案。將 TEMPLATE_NAME 替換為範本名稱。

    請確認中繼資料檔案沒有副檔名。舉例來說,如果範本名稱為 myTemplate,則中繼資料檔案必須為 myTemplate_metadata

  2. 將中繼資料檔案儲存在範本所在的 Cloud Storage 資料夾中。

中繼資料參數

參數鍵 必填 值說明
name 範本的名稱。
description 用來說明範本的一小段文字。
streaming 如果 true,這個範本支援串流。預設值為 false
supportsAtLeastOnce 如果 true,這個範本支援至少一次處理。預設值為 false。如果範本的設計是搭配至少一次串流模式使用,請將這個參數設為 true
supportsExactlyOnce 如果 true,這個範本支援僅須處理一次。預設值為 true
defaultStreamingMode 如果範本同時支援「至少一次」和「僅一次」模式,預設串流模式為「至少一次」。請使用下列其中一個值:"AT_LEAST_ONCE""EXACTLY_ONCE"。如未指定,預設串流模式為「恰好一次」。
parameters 範本使用的額外參數陣列。預設會使用空陣列。
name 範本中使用的參數名稱。
label 使用者可理解的字串,用於在 Google Cloud 控制台中標記參數。
helpText 用來說明參數的一小段文字。
isOptional 如果參數為必要,則為 false;如果參數為選用,則為 true。除非設定值,否則 isOptional 預設為 false。 如果中繼資料未包含這個參數鍵,中繼資料就會成為必要參數。
regexes 字串形式的 POSIX-egrep 規則運算式陣列,用於驗證參數的值。舉例來說,["^[a-zA-Z][a-zA-Z0-9]+"] 是單一規則運算式,可驗證開頭為一個字母、後續有一或多個字元的值。預設為空陣列。

中繼資料檔案範例

Java

Dataflow 服務使用下列中繼資料來驗證 WordCount 範本的自訂參數:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Dataflow 服務使用下列中繼資料來驗證 WordCount 範本的自訂參數:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

您可以從 Dataflow 範本目錄下載 Google 提供的範本中繼資料檔案。

支援的管道 I/O 連接器和 ValueProvider

Java

某些 I/O 連接器包含的方法會接受 ValueProvider 物件。如要判定特定連接器與方法的支援情況,請參閱 I/O 連接器的 API 參考說明文件。支援的方法會有 ValueProvider 超載情況。如果某方法沒有超載情況,就表示該方法不支援執行階段參數。下列 I/O 連接器至少具備部分的 ValueProvider 支援:

  • 檔案型 IO:TextIOAvroIOFileIOTFRecordIOXmlIO
  • BigQueryIO*
  • BigtableIO (需要 SDK 2.3.0 以上版本)
  • PubSubIO
  • SpannerIO

Python

某些 I/O 連接器包含的方法會接受 ValueProvider 物件。如要判定 I/O 連接器及其方法的支援情況,請參閱連接器的 API 參考說明文件。下列 I/O 連接器接受執行階段參數:

  • 檔案型 IO:textioavroiotfrecordio

建立及暫存傳統範本

編寫管道後,就必須建立及暫存範本檔案。建立及暫存範本後,暫存位置會包含執行範本所需的其他檔案。如果刪除暫存位置,範本就無法執行。 範本暫存後,Dataflow 工作不會立即執行。如要執行以自訂範本為基礎的 Dataflow 工作,可以使用 Google Cloud 主控台Dataflow REST APIgcloud CLI

以下範例說明如何暫存範本檔案:

Java

此 Maven 指令會在 --templateLocation 指定的 Cloud Storage 位置建立及暫存範本。

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

確認 templateLocation 路徑正確無誤。更改下列內容:

  • com.example.myclass:您的 Java 類別
  • PROJECT_ID:您的專案 ID
  • BUCKET_NAME:Cloud Storage bucket 的名稱
  • TEMPLATE_NAME:範本名稱
  • REGION:用於部署 Dataflow 工作的區域

Python

此 Python 指令會在 --template_location 指定的 Cloud Storage 位置建立及暫存範本。

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

確認 template_location 路徑正確無誤。更改下列內容:

  • examples.mymodule:您的 Python 模組
  • PROJECT_ID:您的專案 ID
  • BUCKET_NAME:Cloud Storage bucket 的名稱
  • TEMPLATE_NAME:範本名稱
  • REGION:用於部署 Dataflow 工作的區域

建立及暫存範本後,下一步就是執行範本