本文將說明如何從 Dataflow 管道程式碼建立自訂傳統範本。 傳統範本會封裝現有的 Dataflow 管道,建立可重複使用的範本,您只要變更特定管道參數,就能為每項工作自訂範本。您不必編寫範本,而是使用指令從現有管道產生範本。
以下是大致流程。後續章節會詳細說明這個程序。
- 在管道程式碼中,針對您要在執行階段設定或使用的所有管道選項,請使用
ValueProvider
介面。使用可接受執行階段參數的DoFn
物件。 - 使用額外的中繼資料擴充範本,這樣執行傳統範本時,就能驗證自訂參數。這類中繼資料的範例包括自訂傳統範本的名稱和選用參數。
- 檢查管道 I/O 連接器是否支援
ValueProvider
物件,並視需要進行變更。 - 建立並暫存自訂傳統範本。
- 執行自訂傳統範本。
如要瞭解不同類型的 Dataflow 範本、優點,以及何時該選擇傳統範本,請參閱「Dataflow 範本」。
執行傳統範本所需的權限
執行 Dataflow 傳統範本所需的權限,取決於您執行範本的位置,以及管道的來源和接收器是否位於其他專案中。
如要進一步瞭解如何在本機或使用 Google Cloud執行 Dataflow 管道,請參閱「Dataflow 安全性與權限」。
如需 Dataflow 角色和權限清單,請參閱「Dataflow 存取權控管」。
限制
- 傳統範本不支援下列管道選項。如要控管工作站安全帶執行緒的數量,請使用 Flex 範本。
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- Dataflow 執行器不支援 Pub/Sub 主題和訂閱參數的
ValueProvider
選項。如果執行階段參數需要 Pub/Sub 選項,請使用 Flex 範本。
關於執行階段參數和 ValueProvider
介面
ValueProvider
介面可讓管道接受執行階段參數。Apache Beam 提供三種類型的 ValueProvider
物件。
名稱 | 說明 |
---|---|
RuntimeValueProvider |
您可以使用 若您事先不知道該值,請使用 |
StaticValueProvider |
若您事先知道該值,請使用 |
NestedValueProvider |
若您在執行階段期間想使用該值計算另一個值,請使用 |
在管道程式碼中使用執行階段參數
本節逐步說明如何使用 ValueProvider
、StaticValueProvider
和 NestedValueProvider
。
在管道選項中使用 ValueProvider
針對您想在執行階段設定或使用的所有管道選項,請使用 ValueProvider
。
舉例來說,下列 WordCount
程式碼片段不支援執行階段參數。該程式碼會新增一個輸入檔案選項,建立管道,並從輸入檔案讀取資料行:
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
如要新增執行階段參數支援,請將輸入檔案選項修改成使用 ValueProvider
。
Java
對於輸入檔案選項的類型,請使用 ValueProvider<String>
,而不要使用 String
。
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
將 add_argument
替換為 add_value_provider_argument
。
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
在函式中使用 ValueProvider
如要在您自己的函式中使用執行階段參數值,請將函式更新為使用 ValueProvider
參數。
下列範例含有一個整數 ValueProvider
選項,以及一個可處理整數加法的簡易函式。該函式須依賴 ValueProvider
整數。在執行期間,管道會將 MySumFn
套用至含有 [1, 2, 3]
的 PCollection
裡的每個整數。如果執行階段值是 10,則產生的 PCollection
會含有 [11, 12, 13]
。
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
使用StaticValueProvider
如要將靜態值提供給管道,請使用 StaticValueProvider
。
本範例使用 MySumFn
,這是會取用 ValueProvider<Integer>
的 DoFn
。如果您事先就知道參數的值,就可以使用 StaticValueProvider
,將靜態值指定為 ValueProvider
。
Java
此程式碼會在管道執行階段取用值:
.apply(ParDo.of(new MySumFn(options.getInt())))
您可以改為使用 StaticValueProvider
搭配靜態值:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
此程式碼會在管道執行階段取用值:
beam.ParDo(MySumFn(user_options.templated_int))
您可以改為使用 StaticValueProvider
搭配靜態值:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
若您實作的 I/O 模組同時支援正則參數和執行階段參數,也可以使用 StaticValueProvider
。
StaticValueProvider
會減少因採用兩種類似方法而導致程式碼重複的情況。
Java
本範例的原始碼來自於 GitHub 上的 Apache Beam TextIO.java。
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
在本範例中,有單一建構函式可接受 string
或 ValueProvider
引數。如果引數是 string
,就會轉換成 StaticValueProvider
。
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
使用NestedStaticValueProvider
如要計算另一個 ValueProvider
物件的值,請使用 NestedValueProvider
。
NestedValueProvider
會取用 ValueProvider
和 SerializableFunction
轉譯器做為輸入。當您在 NestedValueProvider
呼叫 .get()
,轉譯器會根據 ValueProvider
值建立新的值。此轉譯器可讓您使用 ValueProvider
值建立您想要的最終值。
在以下範例中,使用者提供檔案名稱 file.txt
。轉換作業會將路徑 gs://directory_name/
加在檔案名稱的前面。呼叫 .get()
會傳回 gs://directory_name/file.txt
。
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
在管道程式碼中使用中繼資料
您可以用額外的中繼資料來擴充範本,如此一來,範本執行時,就能驗證自訂參數。如要為範本建立中繼資料,請按照下列步驟操作:
- 使用「中繼資料參數」中的參數,以及「中繼資料檔案範例」中的格式,建立名為
TEMPLATE_NAME_metadata
的 JSON 格式檔案。將TEMPLATE_NAME
替換為範本名稱。請確認中繼資料檔案沒有副檔名。舉例來說,如果範本名稱為
myTemplate
,則中繼資料檔案必須為myTemplate_metadata
。 - 將中繼資料檔案儲存在範本所在的 Cloud Storage 資料夾中。
中繼資料參數
參數鍵 | 必填 | 值說明 | |
---|---|---|---|
name |
是 | 範本的名稱。 | |
description |
否 | 用來說明範本的一小段文字。 | |
streaming |
否 | 如果 true ,這個範本支援串流。預設值為 false 。 |
|
supportsAtLeastOnce |
否 | 如果 true ,這個範本支援至少一次處理。預設值為 false 。如果範本的設計是搭配至少一次串流模式使用,請將這個參數設為 true 。
|
|
supportsExactlyOnce |
否 | 如果 true ,這個範本支援僅須處理一次。預設值為 true 。 |
|
defaultStreamingMode |
否 | 如果範本同時支援「至少一次」和「僅一次」模式,預設串流模式為「至少一次」。請使用下列其中一個值:"AT_LEAST_ONCE" 、"EXACTLY_ONCE" 。如未指定,預設串流模式為「恰好一次」。
|
|
parameters |
否 | 範本使用的額外參數陣列。預設會使用空陣列。 | |
name |
是 | 範本中使用的參數名稱。 | |
label |
是 | 使用者可理解的字串,用於在 Google Cloud 控制台中標記參數。 | |
helpText |
是 | 用來說明參數的一小段文字。 | |
isOptional |
否 | 如果參數為必要,則為 false ;如果參數為選用,則為 true 。除非設定值,否則 isOptional 預設為 false 。
如果中繼資料未包含這個參數鍵,中繼資料就會成為必要參數。 |
|
regexes |
否 | 字串形式的 POSIX-egrep 規則運算式陣列,用於驗證參數的值。舉例來說,["^[a-zA-Z][a-zA-Z0-9]+"] 是單一規則運算式,可驗證開頭為一個字母、後續有一或多個字元的值。預設為空陣列。 |
中繼資料檔案範例
Java
Dataflow 服務使用下列中繼資料來驗證 WordCount 範本的自訂參數:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Dataflow 服務使用下列中繼資料來驗證 WordCount 範本的自訂參數:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
您可以從 Dataflow 範本目錄下載 Google 提供的範本中繼資料檔案。
支援的管道 I/O 連接器和 ValueProvider
Java
某些 I/O 連接器包含的方法會接受 ValueProvider
物件。如要判定特定連接器與方法的支援情況,請參閱 I/O 連接器的 API 參考說明文件。支援的方法會有 ValueProvider
超載情況。如果某方法沒有超載情況,就表示該方法不支援執行階段參數。下列 I/O 連接器至少具備部分的 ValueProvider
支援:
- 檔案型 IO:
TextIO
、AvroIO
、FileIO
、TFRecordIO
、XmlIO
BigQueryIO
*BigtableIO
(需要 SDK 2.3.0 以上版本)PubSubIO
SpannerIO
Python
某些 I/O 連接器包含的方法會接受 ValueProvider
物件。如要判定 I/O 連接器及其方法的支援情況,請參閱連接器的 API 參考說明文件。下列 I/O 連接器接受執行階段參數:
- 檔案型 IO:
textio
、avroio
、tfrecordio
建立及暫存傳統範本
編寫管道後,就必須建立及暫存範本檔案。建立及暫存範本後,暫存位置會包含執行範本所需的其他檔案。如果刪除暫存位置,範本就無法執行。 範本暫存後,Dataflow 工作不會立即執行。如要執行以自訂範本為基礎的 Dataflow 工作,可以使用 Google Cloud 主控台、Dataflow REST API 或 gcloud CLI。
以下範例說明如何暫存範本檔案:
Java
此 Maven 指令會在 --templateLocation
指定的 Cloud Storage 位置建立及暫存範本。
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
確認 templateLocation
路徑正確無誤。更改下列內容:
com.example.myclass
:您的 Java 類別PROJECT_ID
:您的專案 IDBUCKET_NAME
:Cloud Storage bucket 的名稱TEMPLATE_NAME
:範本名稱REGION
:用於部署 Dataflow 工作的區域
Python
此 Python 指令會在 --template_location
指定的 Cloud Storage 位置建立及暫存範本。
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
確認 template_location
路徑正確無誤。更改下列內容:
examples.mymodule
:您的 Python 模組PROJECT_ID
:您的專案 IDBUCKET_NAME
:Cloud Storage bucket 的名稱TEMPLATE_NAME
:範本名稱REGION
:用於部署 Dataflow 工作的區域
建立及暫存範本後,下一步就是執行範本。