Apache Airflow-DAG in Cloud Composer 1 ausführen (Google Cloud CLI)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

In dieser Kurzanleitung erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen und einen Apache Airflow-DAG in Cloud Composer 1 ausführen.

Hinweise

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Install the Google Cloud CLI.

  8. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  13. Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Ausführen dieser Kurzanleitung benötigen:

    Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

    Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Dienstkonto für eine Umgebung erstellen

Wenn Sie eine Umgebung erstellen, geben Sie ein Dienstkonto an. Dieses Dienstkonto wird als Dienstkonto der Umgebung bezeichnet. Ihre Umgebung verwendet dieses Dienstkonto für die meisten Vorgänge.

Das Dienstkonto für Ihre Umgebung ist kein Nutzerkonto. Ein Dienstkonto ist ein spezieller Kontotyp, der nicht von einer Person, sondern von einer Anwendung oder einer VM-Instanz verwendet wird.

So erstellen Sie ein Dienstkonto für Ihre Umgebung:

  1. Erstellen Sie ein neues Dienstkonto, wie in der Dokumentation zu Identity and Access Management beschrieben.

  2. Weisen Sie ihm eine Rolle zu, wie in der Dokumentation zu Identity and Access Management beschrieben. Die erforderliche Rolle ist Composer-Worker (composer.worker).

Umgebung erstellen

Erstellen Sie eine neue Umgebung mit dem Namen example-environment in der Region us-central1 mit der neuesten Cloud Composer 1-Version.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.20.12-airflow-1.10.15

DAG-Datei erstellen

Ein Airflow-DAG ist eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.

In dieser Anleitung wird ein Beispiel-Airflow-DAG verwendet, der in der Datei quickstart.py definiert ist. Der Python-Code in dieser Datei führt folgende Aktionen aus:

  1. Erstellt einen DAG, composer_sample_dag. Dieser DAG wird jeden Tag ausgeführt.
  2. Die Aufgabe print_dag_run_conf wird ausgeführt. Sie gibt mithilfe des bash-Operators die Konfiguration der DAG-Ausführung aus.

Speichern Sie eine Kopie der Datei quickstart.py auf Ihrem lokalen Computer:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG-Datei in den Bucket Ihrer Umgebung hochladen

Jeder Cloud Composer-Umgebung ist ein Cloud Storage-Bucket zugeordnet. Airflow in Cloud Composer plant nur DAGs, die sich im Ordner /dags in diesem Bucket befinden.

Zum Planen Ihres DAG laden Sie quickstart.py von Ihrem lokalen Computer in den Ordner /dags Ihrer Umgebung hoch:

Führen Sie den folgenden Befehl in dem Ordner aus, in dem sich die quickstart.py-Datei befindet, um quickstart.py mit der Google Cloud CLI hochzuladen:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

DAG ansehen

Nachdem Sie die DAG-Datei hochgeladen haben, führt Airflow die folgenden Schritte aus:

  1. Parst die von Ihnen hochgeladene DAG-Datei. Es kann einige Minuten dauern, bis der DAG in Airflow verfügbar ist.
  2. Fügt die DAG der Liste der verfügbaren DAGs hinzu.
  3. Führt den DAG gemäß dem Zeitplan aus, den Sie in der DAG-Datei angegeben haben.

Prüfen Sie, ob Ihr DAG fehlerfrei verarbeitet wurde und in Airflow verfügbar ist, indem Sie ihn in der DAG-Benutzeroberfläche aufrufen. Die DAG-Benutzeroberfläche ist die Cloud Composer-Oberfläche zum Ansehen von DAG-Informationen in der Google Cloud Console. Cloud Composer bietet auch Zugriff auf die Airflow-UI, eine native Airflow-Weboberfläche.

  1. Warten Sie etwa fünf Minuten, damit Airflow die DAG-Datei verarbeiten kann, die Sie zuvor hochgeladen haben, und der erste DAG-Lauf abgeschlossen werden kann (wird später erläutert).

  2. Führen Sie den folgenden Befehl in der Google Cloud CLI aus. Mit diesem Befehl wird der dags list Airflow-Befehlszeilenbefehl ausgeführt, mit dem DAGs in Ihrer Umgebung aufgeführt werden.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Prüfen Sie, ob die composer_quickstart-DAG in der Ausgabe des Befehls aufgeführt ist.

    Beispielausgabe:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Details zur DAG-Ausführung ansehen

Eine einzelne Ausführung eines DAG wird als DAG-Ausführung bezeichnet. Airflow führt sofort eine DAG-Ausführung für den Beispiel-DAG aus, da das Startdatum in der DAG-Datei auf „yesterday“ (gestern) festgelegt ist. So holt Airflow den Zeitplan des angegebenen DAG auf.

Der Beispiel-DAG enthält eine Aufgabe, print_dag_run_conf, die den Befehl echo in der Konsole ausführt. Mit diesem Befehl werden Metainformationen zum DAG ausgegeben (numerische ID des DAG-Laufs).

Führen Sie den folgenden Befehl in der Google Cloud CLI aus. Mit diesem Befehl werden DAG-Ausführungen für den composer_quickstart-DAG aufgelistet:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Beispielausgabe:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Die Airflow-Befehlszeile bietet keinen Befehl zum Aufrufen von Aufgabenlogs. Sie können auch andere Methoden verwenden, um Airflow-Aufgabenlogs anzusehen: Cloud Composer-DAG-UI, Airflow-UI oder Cloud Logging. In diesem Leitfaden wird beschrieben, wie Sie Cloud Logging nach Logs aus einem bestimmten DAG-Lauf abfragen.

Führen Sie den folgenden Befehl in der Google Cloud CLI aus. Mit diesem Befehl werden Logs aus Cloud Logging für einen bestimmten DAG-Lauf des DAG composer_quickstart gelesen. Mit dem Argument --format wird die Ausgabe so formatiert, dass nur der Text der Lognachricht angezeigt wird.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Ersetzen Sie:

  • RUN_ID mit dem Wert run_id aus der Ausgabe des tasks states-for-dag-run-Befehls, den Sie zuvor ausgeführt haben. Beispiel: 2024-02-17T15:38:38.969307+00:00.

Beispielausgabe:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Bereinigen

Löschen Sie das Google Cloud -Projekt mit den Ressourcen, damit Ihrem Google Cloud -Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.

Löschen Sie die in dieser Anleitung verwendeten Ressourcen:

  1. Löschen Sie die Cloud Composer-Umgebung.

    1. Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.

      Zur Seite Umgebungen

    2. Wählen Sie example-environment aus und klicken Sie auf Löschen.

    3. Warten Sie, bis die Umgebung gelöscht ist.

  2. Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.

    1. Wechseln Sie in der Google Cloud Console zur Seite Speicher > Browser.

      Öffnen Sie "Storage" > "Browser".

    2. Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise us-central1-example-environ-c1616fe8-bucket heißen.

Nächste Schritte