執行傳統範本

建立並暫存 Dataflow 範本後,請使用 Google Cloud 控制台、REST API 或 Google Cloud CLI 執行範本。您可以在多種環境中部署 Dataflow 範本工作,可部署的環境包括 App Engine 標準環境、Cloud Run 函式和其他受限環境。

使用 Google Cloud 控制台

您可以使用 Google Cloud 主控台執行 Google 提供的範本和自訂 Dataflow 範本。

Google 提供的範本

如要執行 Google 提供的範本:

  1. 前往 Google Cloud 控制台的 Dataflow 頁面。
  2. 前往 Dataflow 頁面
  3. 按一下「CREATE JOB FROM TEMPLATE」(利用範本建立工作)
  4. Google Cloud 控制台的「Create Job From Template」按鈕
  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中,選取要執行且由 Google 提供的範本。
  6. WordCount 範本執行表單
  7. 在「Job Name」(工作名稱) 欄位中輸入工作名稱。
  8. 在提供的參數欄位中輸入參數值。使用 Google 提供的範本時,您不需要使用「Additional Parameters」(其他參數) 區段。
  9. 點按「執行工作」

自訂範本

如要執行自訂範本:

  1. 前往 Google Cloud 控制台的 Dataflow 頁面。
  2. 前往 Dataflow 頁面
  3. 按一下 [CREATE JOB FROM TEMPLATE] (利用範本建立工作)
  4. Google Cloud 控制台的「透過範本建立工作」按鈕
  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取「Custom Template」(自訂範本)
  6. 自訂範本執行表單
  7. 在「Job Name」(工作名稱) 欄位中輸入工作名稱。
  8. 請在「Template GCS Path」(範本 GCS 路徑) 欄位中輸入範本檔案的 Cloud Storage 路徑。
  9. 如果範本需要參數,請按一下「Additional parameters」(其他參數) 區段中的「ADD PARAMETER」(新增參數)。在「Name」(名稱) 和「Value」(值) 欄位中分別輸入參數的名稱和值。針對每個所需的參數重複這個步驟。
  10. 點按「執行工作」

使用 REST API。

如要透過 REST API 要求執行範本,請使用專案 ID 傳送 HTTP POST 要求,這項要求需要授權

如要進一步瞭解可用的參數,請參閱 projects.locations.templates.launch 的 REST API 參考資料。

建立自訂範本批次工作

這個 projects.locations.templates.launch 範例要求會透過一個會讀取文字檔和寫入輸出文字檔的範本來建立批次工作。如果要求成功,回應主體會包含 LaunchTemplateResponse 的執行個體。

修改下列值:

  • YOUR_PROJECT_ID 替換為您的專案 ID。
  • LOCATION 替換為您選擇的 Dataflow 地區
  • 請將 JOB_NAME 改成您選擇的工作名稱。
  • YOUR_BUCKET_NAME 替換為 Cloud Storage 值區名稱。
  • gcsPath 設為範本檔案的 Cloud Storage 位置。
  • parameters 設為鍵/值組合清單。
  • tempLocation 設為您擁有寫入權限的位置。您必須輸入值,才能執行 Google 提供的範本。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

建立自訂範本串流工作

這個 projects.locations.templates.launch 範例要求會透過一個從 Pub/Sub 訂閱項目讀取資料並寫入 BigQuery 資料表的傳統範本,建立串流工作。如要啟動 Flex 範本,請改用 projects.locations.flexTemplates.launch範本範例是 Google 提供的範本。您可以修改範本中的路徑,指向自訂範本。啟動 Google 提供的範本和自訂範本時,會使用相同的邏輯。在本範例中,具有適當結構定義的 BigQuery 表格必須已存在。如果成功,回應主體會包含 LaunchTemplateResponse 的執行個體。

修改下列值:

  • YOUR_PROJECT_ID 替換為您的專案 ID。
  • LOCATION 替換為您選擇的 Dataflow 地區
  • 請將 JOB_NAME 改成您選擇的工作名稱。
  • YOUR_BUCKET_NAME 替換為 Cloud Storage 值區名稱。
  • 請將 GCS_PATH 改成範本檔案的 Cloud Storage 位置。位置開頭應為 gs://
  • parameters 設為鍵/值組合清單。列出的參數是這個範本範例專屬的參數。 如果使用自訂範本,請視需要修改參數。如果您使用範例範本,請替換下列變數。
    • 請將 YOUR_SUBSCRIPTION_NAME 改成您的 Pub/Sub 訂閱名稱。
    • 請將 YOUR_DATASET 改成您的 BigQuery 資料集,並且將 YOUR_TABLE_NAME 替換成您的 BigQuery 資料表名稱。
  • tempLocation 設為您擁有寫入權限的位置。您必須輸入值,才能執行 Google 提供的範本。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

更新自訂範本串流工作

這個 projects.locations.templates.launch 範例要求會說明如何更新範本串流工作。如要更新 Flex 範本,請改用 projects.locations.flexTemplates.launch

  1. 執行 範例 2:建立自訂範本串流工作 ,開始執行串流範本工作。
  2. 傳送下列 HTTP POST 要求,並修改下列值:
    • YOUR_PROJECT_ID 替換為您的專案 ID。
    • 請將 LOCATION 改成您要更新的工作的 Dataflow 地區
    • JOB_NAME 替換為要更新的工作確切名稱。
    • 請將 GCS_PATH 改成範本檔案的 Cloud Storage 位置。位置開頭應為 gs://
    • parameters 設為鍵/值組合清單。列出的參數是這個範本範例專屬的參數。 如果使用自訂範本,請視需要修改參數。如果您使用範例範本,請替換下列變數。
      • 請將 YOUR_SUBSCRIPTION_NAME 改成您的 Pub/Sub 訂閱名稱。
      • 請將 YOUR_DATASET 改成您的 BigQuery 資料集,並且將 YOUR_TABLE_NAME 替換成您的 BigQuery 資料表名稱。
    • 使用 environment 參數變更環境設定,例如機器類型。本例使用 n2-highmem-2 機器類型,與預設機器類型相比,每個工作站的記憶體和 CPU 更多。
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. 存取 Dataflow 監控介面,確認已建立名稱相同的新工作。這項工作的狀態為「已更新」

使用 Google API 用戶端程式庫

請考慮使用 Google API 用戶端程式庫,輕鬆地呼叫 Dataflow REST API。這個指令碼範例使用的是 Python 適用的 Google API 用戶端程式庫

在此範例中,您必須設定以下變數:

  • project:設為您的專案 ID。
  • job:設為您選擇的不重複的工作名稱。
  • template:設為範本檔案的 Cloud Storage 位置。
  • parameters:設為包含範本參數的字典。

如要設定區域,請加入 location 參數。

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

如要進一步瞭解可用的選項,請參閱 Dataflow REST API 參考資料中的 projects.locations.templates.launch 方法

使用 gcloud CLI

gcloud CLI 可以使用 gcloud dataflow jobs run 指令,執行自訂或 Google 提供的範本。執行 Google 提供之範本的範例均記錄在 Google 提供的範本頁面中。

請為下列自訂範本範例設定以下的值:

  • 請將 JOB_NAME 改成您選擇的工作名稱。
  • YOUR_BUCKET_NAME 替換為 Cloud Storage 值區名稱。
  • --gcs-location 設為範本檔案的 Cloud Storage 位置。
  • --parameters 設為要傳送到工作的逗號分隔參數清單。不允許在逗號間使用空格和值。
  • 如要防止 VM 接受儲存在專案中繼資料中的 SSH 金鑰,請搭配 block_project_ssh_keys 服務選項使用 additional-experiments 旗標: --additional-experiments=block_project_ssh_keys

建立自訂範本批次工作

這個範例會從可讀取文字檔和寫入輸出文字檔的範本中建立批次工作。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

這個要求會傳回以下格式的回應:

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

建立自訂範本串流工作

這個範例可從 Pub/Sub 主題中讀取並寫入 BigQuery 表格的範本中建立串流工作。具有適當結構定義的 BigQuery 表格必須已存在。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

這個要求會傳回以下格式的回應:

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

如需 gcloud dataflow jobs run 指令的完整旗標清單,請參閱 gcloud CLI 參考資料

監控與疑難排解

您可以透過 Dataflow 監控介面來監控 Dataflow 工作。如果工作失敗,您可以在排解管道問題指南中找到疑難排解提示、除錯策略和常見錯誤目錄。