本頁說明如何為 Dataflow 工作設定管道選項。這些管道選項可設定管道的執行方式和位置,以及管道使用的資源。
管道執行作業與 Apache Beam 程式的執行作業是分開進行。您編寫的 Apache Beam 程式會建構管道,以延後執行。也就是說,程式會產生一系列步驟,任何支援的 Apache Beam 執行器都能執行。相容的執行器包括 Google Cloud 上的 Dataflow 執行器,以及直接在本機環境中執行管道的直接執行器。Google Cloud
您可以在執行階段將參數傳遞至 Dataflow 工作。 如要進一步瞭解如何在執行階段設定管道選項,請參閱「設定管道選項」。
搭配 Apache Beam SDK 使用管道選項
您可以使用下列 SDK 設定 Dataflow 工作的管道選項:
- Python 適用的 Apache Beam SDK
- Java 適用的 Apache Beam SDK
- Go 適用的 Apache Beam SDK
如要使用 SDK,請使用 Apache Beam SDK 類別 PipelineOptions
設定管道執行器和其他執行參數。
指定管道選項的方法有兩種:
- 提供管道選項清單,以程式輔助方式設定管道選項。
- 執行管道程式碼時,直接在指令列中設定管道選項。
以程式輔助方式設定管道選項
您可以建立及修改 PipelineOptions
物件,以程式輔助方式設定管道選項。
Java
使用 PipelineOptionsFactory.fromArgs
方法建構 PipelineOptions
物件。
如需範例,請參閱本頁的「在 Dataflow 上啟動」範例一節。
Python
建立 PipelineOptions
物件。
如需範例,請參閱本頁的「在 Dataflow 上啟動」範例一節。
Go
Apache Beam SDK for Go 不支援使用 PipelineOptions
以程式輔助方式設定管道選項。請改用 Go 指令列引數。
如需範例,請參閱本頁的「在 Dataflow 上啟動」範例一節。
在指令列設定管道選項
您可以使用指令列引數設定管道選項。
Java
以下範例語法來自 Java 教學課程中的 WordCount
管道。
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
更改下列內容:
PROJECT_ID
:您的 Google Cloud 專案 IDBUCKET_NAME
:Cloud Storage bucket 的名稱REGION
:a Dataflow 區域,us-central1
Python
以下範例語法來自 Python 教學課程中的 WordCount
管道。
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
更改下列內容:
DATAFLOW_REGION
:要部署 Dataflow 工作的區域,例如europe-west1
--region
旗標會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設地區。STORAGE_BUCKET
:Cloud Storage 值區名稱PROJECT_ID
:專案 ID Google Cloud
Go
以下範例語法來自 Go 教學課程中的 WordCount
管道。
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
更改下列內容:
BUCKET_NAME
:Cloud Storage 值區名稱PROJECT_ID
:專案 ID Google CloudDATAFLOW_REGION
:要部署 Dataflow 工作的區域。例如:europe-west1
。--region
標記會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設地區。
設定實驗性管道選項
在 Java、Python 和 Go SDK 中,experiments
管道選項
可啟用實驗性或正式發布前的 Dataflow 功能。
以程式輔助方式設定
如要透過程式設定 experiments
選項,請使用下列語法。
Java
在 PipelineOptions
物件中,使用下列語法加入 experiments
選項。本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。
options.setExperiments("streaming_boot_disk_size_gb=80")
如需建立 PipelineOptions
物件的範例,請參閱本頁面的「在 Dataflow 上啟動」範例一節。
Python
在 PipelineOptions
物件中,使用下列語法加入 experiments
選項。本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
如需建立 PipelineOptions
物件的範例,請參閱本頁面的「在 Dataflow 上啟動」範例一節。
Go
Apache Beam SDK for Go 不支援使用 PipelineOptions
以程式輔助方式設定管道選項。請改用 Go 指令列引數。
透過指令列設定
如要在指令列設定 experiments
選項,請使用下列語法。
Java
本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。
--experiments=streaming_boot_disk_size_gb=80
Python
本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。
--experiments=streaming_boot_disk_size_gb=80
Go
本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。
--experiments=streaming_boot_disk_size_gb=80
在範本中設定
如要在執行 Dataflow 範本時啟用實驗功能,請使用 --additional-experiments
標記。
傳統範本
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Flex 範本
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
存取管道選項物件
在 Apache Beam 程式中建立 Pipeline
物件時,請傳送 PipelineOptions
。Dataflow 服務執行管道時,會將 PipelineOptions
的副本傳送至每個工作站。
Java
使用 ProcessContext.getPipelineOptions
方法,在任何 ParDo
轉換的 DoFn
執行個體中存取 PipelineOptions
。
Python
Python 適用的 Apache Beam SDK 不支援這項功能。
Go
使用 beam.PipelineOptions
存取管道選項。
在 Dataflow 上啟動
使用 Dataflow 執行器服務,在代管 Google Cloud 資源上執行工作。透過 Dataflow 執行管道會建立 Dataflow 工作,這類工作會使用 Google Cloud專案中的 Compute Engine 和 Cloud Storage 資源。如要瞭解 Dataflow 權限,請參閱「Dataflow 安全性與權限」。
在執行管道期間,Dataflow 工作會使用 Cloud Storage 儲存暫存檔案。請關閉 Dataflow 暫存空間使用的 bucket 虛刪除功能,以免產生不必要的儲存費用。詳情請參閱從 bucket 移除虛刪除政策。
設定必要選項
如要使用 Dataflow 執行管道,請設定下列管道選項:
Java
project
:您的 Google Cloud 專案 ID。runner
:執行管道的管道執行器。如要執行Google Cloud ,這個項目必須設為DataflowRunner
。gcpTempLocation
:Dataflow 的 Cloud Storage 路徑,用於暫存大部分的暫存檔案。指定的 bucket 必須事先存在。如未指定
gcpTempLocation
,Dataflow 會使用tempLocation
選項的值。如果未指定任一選項,Dataflow 會建立新的 Cloud Storage bucket。
Python
project
:您的 Google Cloud 專案 ID。region
:Dataflow 工作的區域。runner
:執行管道的管道執行器。如要執行Google Cloud ,這個項目必須設為DataflowRunner
。temp_location
:Cloud Storage 路徑,供 Dataflow 暫存管道執行期間建立的臨時工作檔案。
Go
project
:您的 Google Cloud 專案 ID。region
:Dataflow 工作的區域。runner
:執行管道的管道執行器。如要執行Google Cloud ,這個項目必須設為dataflow
。staging_location
:Cloud Storage 路徑,供 Dataflow 暫存管道執行期間建立的臨時工作檔案。
以程式輔助方式設定管道選項
下列程式碼範例說明如何建構管道,包括透過程式設定執行器和其他必要選項,使用 Dataflow 執行管道。
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
Go 適用的 Apache Beam SDK 使用 Go 指令列引數。使用 flag.Set()
設定旗標值。
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
建構管道後,請指定所有的管道讀取、轉換和寫入等作業,並執行管道。
透過指令列使用管道選項
下列範例說明如何使用指令列指定的管道選項。這個範例不會以程式輔助的方式設定管道選項。
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
使用 Python argparse 模組剖析指令列選項。
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
使用 Go flag
套件剖析指令列選項。您必須先剖析選項,才能呼叫 beam.Init()
。在這個範例中,output
是指令列選項。
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
建構管道後,請指定所有的管道讀取、轉換和寫入等作業,然後執行管道。
控制執行模式
當 Apache Beam 程式在 Dataflow 等服務上執行管道時,程式可以非同步執行管道,也可以等到管道完成後再執行。如要變更這項行為,請參閱下列指引。
Java
當 Apache Beam Java 程式在 Dataflow 等服務上執行管道時,通常會以非同步方式執行。如要執行管道並等待工作完成,請將 DataflowRunner
設為管道執行器,並明確呼叫 pipeline.run().waitUntilFinish()
。
當您使用 DataflowRunner
並在透過 pipeline.run()
傳回的 PipelineResult
物件上呼叫 waitUntilFinish()
時,管道會在 Google Cloud 上執行,但本機程式碼會等到雲端工作完成後,才傳回最後的 DataflowPipelineJob
物件。在工作執行時,Dataflow 服務會在等待期間顯示工作最新狀態和主控台訊息。
Python
當 Apache Beam Python 程式在 Dataflow 等服務上執行管道時,通常會以非同步方式執行。如要在管道執行完畢前進行封鎖,請使用 PipelineResult
物件的 wait_until_finish()
方法,這個物件是透過執行器的 run()
方法所傳回。
Go
當 Apache Beam Go 程式在 Dataflow 上執行管道時,預設會同步執行,並封鎖管道,直到管道完成為止。如果不想封鎖,有兩種做法:
在 Go 常式中啟動工作。
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
使用
--async
指令列旗標,該旗標位於jobopts
套件中。
如要查看執行作業詳細資料、監控進度及驗證工作完成狀態,請使用 Dataflow 監控介面或 Dataflow 指令列介面。
使用串流來源
Java
如果管道會讀取不受限的資料來源 (例如 Pub/Sub),系統會自動在串流模式中執行該管道。
Python
如果管道使用不受限的資料來源 (例如 Pub/Sub),您必須將 streaming
選項設為 true。
Go
如果管道會讀取不受限的資料來源 (例如 Pub/Sub),系統會自動在串流模式中執行該管道。
根據預設,串流工作會使用 n1-standard-2
以上的 Compute Engine 機器類型。
在本機啟動
您可以選擇在本機執行管道,而不是在代管雲端服務上執行。如要進行測試、除錯或針對小型資料集執行管道,本機執行作業有些優點特別適合這類操作。舉例來說,本機執行作業不需使用遠端 Dataflow 服務和相關 Google Cloud 專案。
當您選用本機執行作業時,搭配管道執行的小型資料集應足以容納於本機記憶體中。您可以利用 Create
轉換來建立小型的記憶體內資料集,也能透過 Read
轉換來使用小型的本機或遠端檔案。本機執行作業通常會使用較少的外部依附元件,讓您可以快速輕鬆地進行測試及偵錯,不過這類作業會因本機環境中可用的記憶體而有所限制。
下列程式碼範例說明如何建構可在本機環境中執行的管道。
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
將管道建構完成後,您就可以加以執行。
建立自訂管道選項
除了標準的 PipelineOptions
外,您還可以新增自訂選項。Apache Beam 的指令列也能使用以相同格式指定的指令列引數,剖析自訂選項。
Java
如要自己新增選項,請針對每個選項定義含有 getter 和 setter 方法的介面,如以下範例所示:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
如要自己新增選項,請使用 add_argument()
方法 (運作方式與 Python 的標準 argparse 模組完全相同),如以下範例所示:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
如要新增自己的選項,請使用 Go 旗標套件,如以下範例所示:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
此外,您可以指定當使用者將 --help
做為指令列引數傳送時顯示的說明,也能指定預設值。
Java
請利用註解來設定說明和預設值,範例如下:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
建議您使用 PipelineOptionsFactory
註冊介面,然後在建立 PipelineOptions
物件時傳送介面。當您使用 PipelineOptionsFactory
註冊介面時,--help
可以找到您自訂的選項介面,並將其新增至 --help
指令的輸出內容。PipelineOptionsFactory
會驗證自訂選項是否與其他已註冊的選項相容。
下列範例程式碼說明如何使用 PipelineOptionsFactory
註冊自訂選項介面:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
您的管道現在可以接受將 --myCustomOption=value
做為指令列引數使用。
Python
請按照以下方式設定說明和預設值:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
請按照以下方式設定說明和預設值:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)