使用 Cloud Composer 的工作流程

在本文件中,您會使用 Google Cloud的下列計費元件:

  • Dataproc
  • Compute Engine
  • Cloud Composer

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

事前準備

設定專案

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  13. To initialize the gcloud CLI, run the following command:

    gcloud init
  14. 建立 Dataproc 工作流程範本

    複製下列指令並在本地終端機視窗或 Cloud Shell 中執行,以建立及定義工作流程範本

    1. 建立 sparkpi 工作流程範本。
      gcloud dataproc workflow-templates create sparkpi \
          --region=us-central1
            
    2. 將 Spark 工作新增至 sparkpi 工作流程範本。「compute」step-id 標記會識別 SparkPi 工作。
      gcloud dataproc workflow-templates add-job spark \
          --workflow-template=sparkpi \
          --step-id=compute \
          --class=org.apache.spark.examples.SparkPi \
          --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
          --region=us-central1 \
          -- 1000
            
    3. 使用代管單一節點叢集執行工作流程。Dataproc 會建立叢集、在叢集上執行工作流程,然後在工作流程完成時刪除叢集。
      gcloud dataproc workflow-templates set-managed-cluster sparkpi \
          --cluster-name=sparkpi \
          --single-node \
          --region=us-central1
            
    4. 確認建立工作流程範本。

      主控台

      在 Google Cloud 控制台的 Dataproc「Workflows」 頁面中,按一下 sparkpi 名稱,開啟「Workflow template details」頁面。按一下工作流程範本名稱,確認範本屬性 sparkpi

      gcloud 指令

      執行下列指令:

      gcloud dataproc workflow-templates describe sparkpi --region=us-central1
          

    建立 DAG 並上傳至 Cloud Storage

    1. 建立或使用現有的 Cloud Composer 環境
    2. 設定環境變數。

      Airflow UI

      1. 依序按一下工具列中的「管理」>「變數」
      2. 按一下「建立」
      3. 輸入下列資訊:
        • 按鍵:project_id
        • Val:PROJECT_ID - 您的 Google Cloud 專案 ID
      4. 按一下 [儲存]

      gcloud 指令

      輸入下列指令:

      • ENVIRONMENT 是 Cloud Composer 環境的名稱
      • LOCATION 是 Cloud Composer 環境所在的區域
      • PROJECT_ID 是包含 Cloud Composer 環境的專案 ID
          gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
          
    3. 將下列 DAG 程式碼複製到名為「composer-dataproc-dag.py」的本機檔案中,該檔案會使用 DataprocInstantiateWorkflowTemplateOperator

      Airflow 2

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.providers.google.cloud.operators.dataproc import (
          DataprocInstantiateWorkflowTemplateOperator,
      )
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = DataprocInstantiateWorkflowTemplateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              region="us-central1",
          )
      

      Airflow 1

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.contrib.operators import dataproc_operator
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              # For more info on regions where Dataflow is available see:
              # https://cloud.google.com/dataflow/docs/resources/locations
              region="us-central1",
          )
      
    4. 將 DAG 上傳至 Cloud Storage 中的環境資料夾。上傳成功後,請按一下 Cloud Composer 環境頁面上的「DAGs Folder」連結。

    查看工作狀態

    Airflow UI

    1. 開啟 Airflow 網頁介面
    2. 在 DAG 頁面中,按一下 DAG 名稱 (例如 dataproc_workflow_dag)。
    3. 在 DAG 詳細資料頁面中,按一下「Graph View」
    4. 查看狀態:
      • 失敗:工作周圍有紅色方框。 你也可以將指標懸停在工作上,然後尋找「狀態:失敗」工作周圍有紅色方塊,表示工作失敗
      • 成功:工作周圍會顯示綠色方塊。 您也可以將指標懸停在工作上,檢查是否顯示「State: Success」工作周圍有綠色方塊,表示工作成功

    控制台

    按一下「工作流程」分頁標籤,即可查看工作流程狀態。

    gcloud 指令

    gcloud dataproc operations list \
        --region=us-central1 \
        --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
        

    清除所用資源

    如要避免系統向您的 Google Cloud 帳戶收費,請刪除本教學課程所用的資源:

    1. 刪除 Cloud Composer 環境

    2. 刪除工作流程範本

    後續步驟