Workflow mit Cloud Composer

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.

Neuen Google Cloud Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweise

Projekt einrichten

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  13. To initialize the gcloud CLI, run the following command:

    gcloud init
  14. Dataproc-Workflow-Vorlage erstellen

    Kopieren Sie die folgenden Befehle und führen Sie sie in einem lokalen Terminalfenster oder in Cloud Shell aus, um eine Workflow-Vorlage zu erstellen und zu definieren.

    1. Erstellen Sie die sparkpi-Workflow-Vorlage.
      gcloud dataproc workflow-templates create sparkpi \
          --region=us-central1
            
    2. Fügen Sie der Workflow-Vorlage sparkpi den Spark-Job hinzu. Das „compute“-Flag step-id identifiziert den SparkPi-Job.
      gcloud dataproc workflow-templates add-job spark \
          --workflow-template=sparkpi \
          --step-id=compute \
          --class=org.apache.spark.examples.SparkPi \
          --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
          --region=us-central1 \
          -- 1000
            
    3. Führen Sie den Workflow mit einem verwalteten Cluster mit einem einzelnen Knoten aus. Dataproc erstellt den Cluster, führt darauf den Workflow aus und löscht den Cluster, wenn der Workflow abgeschlossen ist.
      gcloud dataproc workflow-templates set-managed-cluster sparkpi \
          --cluster-name=sparkpi \
          --single-node \
          --region=us-central1
            
    4. Bestätigen Sie das Erstellen der Workflow-Vorlage.

      Console

      Klicken Sie in der Google Cloud Console auf der Dataproc-Seite Workflows auf den Namen sparkpi, um die Seite Workflow-Vorlagendetails zu öffnen. Klicken Sie auf den Namen Ihrer Workflow-Vorlage, um die sparkpi-Vorlagenattribute zu bestätigen.

      gcloud-Befehl

      Führen Sie dazu diesen Befehl aus:

      gcloud dataproc workflow-templates describe sparkpi --region=us-central1
          

    DAG erstellen und in Cloud Storage hochladen

    1. Eine Cloud Composer-Umgebung erstellen oder eine vorhandene verwenden
    2. Umgebungsvariablen festlegen

      Airflow-UI

      1. Klicken Sie in der Symbolleiste auf Admin > Variables.
      2. Klicken Sie auf Erstellen.
      3. Geben Sie die folgenden Informationen ein:
        • Key: project_id
        • Val: PROJECT_ID – Ihre Google Cloud Projekt-ID
      4. Klicken Sie auf Speichern.

      gcloud-Befehl

      Geben Sie die folgenden Befehle ein:

      • ENVIRONMENT ist der Name der Cloud Composer-Umgebung.
      • LOCATION ist die Region, in der sich die Cloud Composer-Umgebung befindet.
      • PROJECT_ID ist die Projekt-ID des Projekts, das die Cloud Composer-Umgebung enthält.
          gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
          
    3. Kopieren Sie den folgenden DAG-Code lokal in eine Datei namens „composer-dataproc-dag.py“, die den DataprocInstantiateWorkflowTemplateOperator verwendet.

      Airflow 2

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.providers.google.cloud.operators.dataproc import (
          DataprocInstantiateWorkflowTemplateOperator,
      )
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = DataprocInstantiateWorkflowTemplateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              region="us-central1",
          )
      

      Airflow 1

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.contrib.operators import dataproc_operator
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              # For more info on regions where Dataflow is available see:
              # https://cloud.google.com/dataflow/docs/resources/locations
              region="us-central1",
          )
      
    4. Führen Sie ein Upload des DAG in Ihren Umgebungsordner in Cloud Storage aus. Wenn der Upload erfolgreich abgeschlossen wurde, klicken Sie auf der Seite der Cloud Composer-Umgebung auf den Link DAGs-Ordner.

    Status einer Aufgabe ansehen

    Airflow-UI

    1. Öffnen Sie die Airflow-Weboberfläche.
    2. Klicken Sie auf der Seite "DAGs" auf den DAG-Namen, z. B. dataproc_workflow_dag.
    3. Klicken Sie auf der DAGs-Detailseite auf Grafikansicht.
    4. Status prüfen:
      • Fehlgeschlagen: Die Aufgabe ist rot umrandet. Sie können auch den Mauszeiger über die Aufgabe halten und nach State: Failed suchen. Die Aufgabe ist rot umrandet, was angibt, dass sie fehlgeschlagen ist.
      • Erfolgreich: Die Aufgabe ist grün umrandet. Sie können auch den Mauszeiger über die Aufgabe halten und nach State: Success suchen. Die Aufgabe ist grün umrandet, was angibt, dass sie erfolgreich ausgeführt wurde.

    Console

    Klicken Sie auf den Tab „Workflows“, um den Workflow-Status anzusehen.

    gcloud-Befehl

    gcloud dataproc operations list \
        --region=us-central1 \
        --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
        

    Bereinigen

    Um zu vermeiden, dass Ihrem Google Cloud Konto Gebühren in Rechnung gestellt werden, können Sie die in dieser Anleitung verwendeten Ressourcen löschen:

    1. Löschen Sie die Cloud Composer-Umgebung.

    2. Löschen Sie die Workflow-Vorlage.

    Nächste Schritte