設定 Dataflow 管道選項

本頁說明如何為 Dataflow 工作設定管道選項。這些管道選項可設定管道的執行方式和位置,以及管道使用的資源。

管道執行作業與 Apache Beam 程式的執行作業是分開進行。您編寫的 Apache Beam 程式會建構管道,以延後執行。也就是說,程式會產生一系列步驟,任何支援的 Apache Beam 執行器都能執行。相容的執行器包括 Google Cloud 上的 Dataflow 執行器,以及直接在本機環境中執行管道的直接執行器。Google Cloud

您可以在執行階段將參數傳遞至 Dataflow 工作。 如要進一步瞭解如何在執行階段設定管道選項,請參閱「設定管道選項」。

搭配 Apache Beam SDK 使用管道選項

您可以使用下列 SDK 設定 Dataflow 工作的管道選項:

  • Python 適用的 Apache Beam SDK
  • Java 適用的 Apache Beam SDK
  • Go 適用的 Apache Beam SDK

如要使用 SDK,請使用 Apache Beam SDK 類別 PipelineOptions 設定管道執行器和其他執行參數。

指定管道選項的方法有兩種:

  • 提供管道選項清單,以程式輔助方式設定管道選項。
  • 執行管道程式碼時,直接在指令列中設定管道選項。

以程式輔助方式設定管道選項

您可以建立及修改 PipelineOptions 物件,以程式輔助方式設定管道選項。

Java

使用 PipelineOptionsFactory.fromArgs 方法建構 PipelineOptions 物件。

如需範例,請參閱本頁的「在 Dataflow 上啟動」範例一節。

Python

建立 PipelineOptions 物件。

如需範例,請參閱本頁的「在 Dataflow 上啟動」範例一節。

Go

Apache Beam SDK for Go 不支援使用 PipelineOptions 以程式輔助方式設定管道選項。請改用 Go 指令列引數。

如需範例,請參閱本頁的「在 Dataflow 上啟動」範例一節。

在指令列設定管道選項

您可以使用指令列引數設定管道選項。

Java

以下範例語法來自 Java 教學課程中的 WordCount 管道。

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

更改下列內容:

  • PROJECT_ID:您的 Google Cloud 專案 ID
  • BUCKET_NAME:Cloud Storage bucket 的名稱
  • REGION:a Dataflow 區域, us-central1

Python

以下範例語法來自 Python 教學課程中的 WordCount 管道。

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

更改下列內容:

  • DATAFLOW_REGION:要部署 Dataflow 工作的區域,例如 europe-west1

    --region 旗標會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設地區。

  • STORAGE_BUCKET:Cloud Storage 值區名稱

  • PROJECT_ID:專案 ID Google Cloud

Go

以下範例語法來自 Go 教學課程中的 WordCount 管道。

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

更改下列內容:

  • BUCKET_NAME:Cloud Storage 值區名稱

  • PROJECT_ID:專案 ID Google Cloud

  • DATAFLOW_REGION:要部署 Dataflow 工作的區域。例如:europe-west1--region 標記會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設地區。

設定實驗性管道選項

在 Java、Python 和 Go SDK 中,experiments 管道選項 可啟用實驗性或正式發布前的 Dataflow 功能。

以程式輔助方式設定

如要透過程式設定 experiments 選項,請使用下列語法。

Java

PipelineOptions 物件中,使用下列語法加入 experiments 選項。本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。

options.setExperiments("streaming_boot_disk_size_gb=80")

如需建立 PipelineOptions 物件的範例,請參閱本頁面的「在 Dataflow 上啟動」範例一節。

Python

PipelineOptions 物件中,使用下列語法加入 experiments 選項。本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

如需建立 PipelineOptions 物件的範例,請參閱本頁面的「在 Dataflow 上啟動」範例一節。

Go

Apache Beam SDK for Go 不支援使用 PipelineOptions 以程式輔助方式設定管道選項。請改用 Go 指令列引數。

透過指令列設定

如要在指令列設定 experiments 選項,請使用下列語法。

Java

本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。

--experiments=streaming_boot_disk_size_gb=80

Python

本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。

--experiments=streaming_boot_disk_size_gb=80

Go

本範例會使用實驗旗標,將開機磁碟大小設為 80 GB。

--experiments=streaming_boot_disk_size_gb=80

在範本中設定

如要在執行 Dataflow 範本時啟用實驗功能,請使用 --additional-experiments 標記。

傳統範本

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Flex 範本

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

存取管道選項物件

在 Apache Beam 程式中建立 Pipeline 物件時,請傳送 PipelineOptions。Dataflow 服務執行管道時,會將 PipelineOptions 的副本傳送至每個工作站。

Java

使用 ProcessContext.getPipelineOptions 方法,在任何 ParDo 轉換的 DoFn 執行個體中存取 PipelineOptions

Python

Python 適用的 Apache Beam SDK 不支援這項功能。

Go

使用 beam.PipelineOptions 存取管道選項。

在 Dataflow 上啟動

使用 Dataflow 執行器服務,在代管 Google Cloud 資源上執行工作。透過 Dataflow 執行管道會建立 Dataflow 工作,這類工作會使用 Google Cloud專案中的 Compute Engine 和 Cloud Storage 資源。如要瞭解 Dataflow 權限,請參閱「Dataflow 安全性與權限」。

在執行管道期間,Dataflow 工作會使用 Cloud Storage 儲存暫存檔案。請關閉 Dataflow 暫存空間使用的 bucket 虛刪除功能,以免產生不必要的儲存費用。詳情請參閱從 bucket 移除虛刪除政策

設定必要選項

如要使用 Dataflow 執行管道,請設定下列管道選項:

Java

  • project:您的 Google Cloud 專案 ID。
  • runner:執行管道的管道執行器。如要執行Google Cloud ,這個項目必須設為 DataflowRunner
  • gcpTempLocation:Dataflow 的 Cloud Storage 路徑,用於暫存大部分的暫存檔案。指定的 bucket 必須事先存在。

    如未指定 gcpTempLocation,Dataflow 會使用 tempLocation 選項的值。如果未指定任一選項,Dataflow 會建立新的 Cloud Storage bucket。

Python

  • project:您的 Google Cloud 專案 ID。
  • region:Dataflow 工作的區域。
  • runner:執行管道的管道執行器。如要執行Google Cloud ,這個項目必須設為 DataflowRunner
  • temp_location:Cloud Storage 路徑,供 Dataflow 暫存管道執行期間建立的臨時工作檔案。

Go

  • project:您的 Google Cloud 專案 ID。
  • region:Dataflow 工作的區域。
  • runner:執行管道的管道執行器。如要執行Google Cloud ,這個項目必須設為 dataflow
  • staging_location:Cloud Storage 路徑,供 Dataflow 暫存管道執行期間建立的臨時工作檔案。

以程式輔助方式設定管道選項

下列程式碼範例說明如何建構管道,包括透過程式設定執行器和其他必要選項,使用 Dataflow 執行管道。

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

Go 適用的 Apache Beam SDK 使用 Go 指令列引數。使用 flag.Set() 設定旗標值。

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

建構管道後,請指定所有的管道讀取、轉換和寫入等作業,並執行管道。

透過指令列使用管道選項

下列範例說明如何使用指令列指定的管道選項。這個範例不會以程式輔助的方式設定管道選項。

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

使用 Python argparse 模組剖析指令列選項。

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

使用 Go flag 套件剖析指令列選項。您必須先剖析選項,才能呼叫 beam.Init()。在這個範例中,output 是指令列選項。

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

建構管道後,請指定所有的管道讀取、轉換和寫入等作業,然後執行管道。

控制執行模式

當 Apache Beam 程式在 Dataflow 等服務上執行管道時,程式可以非同步執行管道,也可以等到管道完成後再執行。如要變更這項行為,請參閱下列指引。

Java

當 Apache Beam Java 程式在 Dataflow 等服務上執行管道時,通常會以非同步方式執行。如要執行管道並等待工作完成,請將 DataflowRunner 設為管道執行器,並明確呼叫 pipeline.run().waitUntilFinish()

當您使用 DataflowRunner 並在透過 pipeline.run() 傳回的 PipelineResult 物件上呼叫 waitUntilFinish() 時,管道會在 Google Cloud 上執行,但本機程式碼會等到雲端工作完成後,才傳回最後的 DataflowPipelineJob 物件。在工作執行時,Dataflow 服務會在等待期間顯示工作最新狀態和主控台訊息。

Python

當 Apache Beam Python 程式在 Dataflow 等服務上執行管道時,通常會以非同步方式執行。如要在管道執行完畢前進行封鎖,請使用 PipelineResult 物件的 wait_until_finish() 方法,這個物件是透過執行器的 run() 方法所傳回。

Go

當 Apache Beam Go 程式在 Dataflow 上執行管道時,預設會同步執行,並封鎖管道,直到管道完成為止。如果不想封鎖,有兩種做法:

  1. 在 Go 常式中啟動工作。

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. 使用 --async 指令列旗標,該旗標位於 jobopts 套件中。

如要查看執行作業詳細資料、監控進度及驗證工作完成狀態,請使用 Dataflow 監控介面Dataflow 指令列介面

使用串流來源

Java

如果管道會讀取不受限的資料來源 (例如 Pub/Sub),系統會自動在串流模式中執行該管道。

Python

如果管道使用不受限的資料來源 (例如 Pub/Sub),您必須將 streaming 選項設為 true。

Go

如果管道會讀取不受限的資料來源 (例如 Pub/Sub),系統會自動在串流模式中執行該管道。

根據預設,串流工作會使用 n1-standard-2 以上的 Compute Engine 機器類型

在本機啟動

您可以選擇在本機執行管道,而不是在代管雲端服務上執行。如要進行測試、除錯或針對小型資料集執行管道,本機執行作業有些優點特別適合這類操作。舉例來說,本機執行作業不需使用遠端 Dataflow 服務和相關 Google Cloud 專案。

當您選用本機執行作業時,搭配管道執行的小型資料集應足以容納於本機記憶體中。您可以利用 Create 轉換來建立小型的記憶體內資料集,也能透過 Read 轉換來使用小型的本機或遠端檔案。本機執行作業通常會使用較少的外部依附元件,讓您可以快速輕鬆地進行測試及偵錯,不過這類作業會因本機環境中可用的記憶體而有所限制。

下列程式碼範例說明如何建構可在本機環境中執行的管道。

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

將管道建構完成後,您就可以加以執行。

建立自訂管道選項

除了標準的 PipelineOptions 外,您還可以新增自訂選項。Apache Beam 的指令列也能使用以相同格式指定的指令列引數,剖析自訂選項。

Java

如要自己新增選項,請針對每個選項定義含有 getter 和 setter 方法的介面,如以下範例所示:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

如要自己新增選項,請使用 add_argument() 方法 (運作方式與 Python 的標準 argparse 模組完全相同),如以下範例所示:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

如要新增自己的選項,請使用 Go 旗標套件,如以下範例所示:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

此外,您可以指定當使用者將 --help 做為指令列引數傳送時顯示的說明,也能指定預設值。

Java

請利用註解來設定說明和預設值,範例如下:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

建議您使用 PipelineOptionsFactory 註冊介面,然後在建立 PipelineOptions 物件時傳送介面。當您使用 PipelineOptionsFactory 註冊介面時,--help 可以找到您自訂的選項介面,並將其新增至 --help 指令的輸出內容。PipelineOptionsFactory 會驗證自訂選項是否與其他已註冊的選項相容。

下列範例程式碼說明如何使用 PipelineOptionsFactory 註冊自訂選項介面:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

您的管道現在可以接受將 --myCustomOption=value 做為指令列引數使用。

Python

請按照以下方式設定說明和預設值:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

請按照以下方式設定說明和預設值:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)