Cloud Composer を使用するワークフロー

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Dataproc
  • Compute Engine
  • Cloud Composer

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

プロジェクトを設定する

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Dataproc ワークフロー テンプレートの作成

ローカル ターミナルウィンドウまたは Cloud Shell で以下に表示されているコマンドをコピーして実行し、ワークフロー テンプレートを作成して定義します。

  1. sparkpi ワークフロー テンプレートを作成します。
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Spark ジョブを sparkpi ワークフロー テンプレートに追加します。「compute」 step-id フラグは、SparkPi ジョブを示します。
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. ワークフローを実行するには、マネージド単一ノードクラスタを使用します。Dataproc によってクラスタが作成され、ワークフローがクラスタ上で実行され、ワークフローの完了時にクラスタが削除されます。
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. ワークフロー テンプレートの作成を確認します。

    コンソール

    Google Cloud コンソールの [Dataproc ワークフロー] ページで sparkpi 名をクリックし、[ワークフロー テンプレートの詳細] ページを開きます。ワークフロー テンプレートの名前をクリックして、sparkpi テンプレート属性を確認します。

    gcloud コマンド

    次のコマンドを実行します。

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

DAG を作成して Cloud Storage にアップロードする

  1. Cloud Composer 環境を作成するか、既存の環境を使用します。
  2. 環境変数を設定します。

    Airflow UI

    1. ツールバーで、[Admin] > [Variables] をクリックします。
    2. [作成] をクリックします。
    3. 次の情報を入力します。
      • Key:project_id
      • Val: PROJECT_ID - Google Cloud プロジェクト ID
    4. [保存] をクリックします。

    gcloud コマンド

    次のコマンドを入力します。

    • ENVIRONMENT は、Cloud Composer 環境の名前です。
    • LOCATION は、Cloud Composer 環境が配置されているリージョンです。
    • PROJECT_ID は、Cloud Composer 環境を含むプロジェクトのプロジェクト ID です。
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. 次の DAG コードを、ローカルで「composer-dataproc-dag.py」という名前のファイルにコピーします。このコードは、DataprocWorkflowTemplateInstantiateOperator を使用します。

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Cloud Storage の環境フォルダに DAG をアップロードします。アップロードが正常に完了したら、Cloud Composer 環境のページにある DAG フォルダのリンクをクリックします。

タスクのステータスの表示

Airflow UI

  1. Airflow ウェブ インターフェースを開きます。
  2. DAG ページで、DAG 名(例: dataproc_workflow_dag)をクリックします。
  3. DAG の詳細ページで、[Graph View] をクリックします。
  4. ステータスを確認します。
    • Failed: タスクの周囲に赤いボックスが表示されます。 タスクにカーソルを合わせて [State: Failed] を探すこともできます。タスクの周囲に失敗したことを示す赤いボックスが表示されている
    • Success: タスクの周囲に緑色のボックスが表示されます。 タスクにカーソルを合わせて [State: Success] を確認することもできます。タスクの周囲に成功したことを示す緑色のボックスが表示されている

Console

[Workflow] タブをクリックして、ワークフローのステータスを確認します。

gcloud コマンド

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

クリーンアップ

Google Cloud アカウントに課金されないようにするには、このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します。

  2. ワークフロー テンプレートを削除します。

次のステップ