Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Apache Airflow bietet eine Befehlszeile, mit der Sie Aufgaben wie das Auslösen und Verwalten von DAGs, das Abrufen von Informationen zu DAG-Ausführungen und -Aufgaben, das Hinzufügen und Löschen von Verbindungen und Nutzern ausführen können.
Unterstützte Befehle der Airflow-Befehlszeile
Airflow verwendet die Airflow 2-Befehlszeilensyntax, die in der Airflow-Dokumentation beschrieben ist.
Eine vollständige Liste der unterstützten Airflow-Befehlszeilenbefehle finden Sie in der Referenz zum Befehl gcloud composer environments run
.
Hinweise
Sie benötigen Berechtigungen, um die Google Cloud CLI mit Cloud Composer zu verwenden und Airflow-Befehlszeilenbefehle auszuführen.
In Cloud Composer-Versionen vor 2.4.0 benötigen Sie Zugriff auf die Steuerungsebene des Clusters Ihrer Umgebung, um Airflow-Befehle auszuführen.
Befehle der Airflow-Befehlszeile ausführen
Zum Ausführen von Befehlen der Airflow-Befehlszeile in Ihren Umgebungen verwenden Sie die gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
SUBCOMMAND \
-- SUBCOMMAND_ARGUMENTS
Ersetzen Sie Folgendes :
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.SUBCOMMAND
: einer der unterstützten Airflow-Befehlszeilenbefehle.SUBCOMMAND_ARGUMENTS
durch Argumente für den Airflow-Befehlszeilenbefehl.
Trennzeichen für Unterbefehle
Trennen Sie die Argumente für den angegebenen Airflow-Befehlszeilenbefehl durch --
:
- Geben Sie komplexe Unterbefehle als Unterbefehl an.
- Geben Sie nach dem
--
-Trennzeichen Argumente für zusammengesetzte Befehle als Unterbefehlsargumente an.
Beispiel:
gcloud composer environments run example-environment \
dags list -- --output=json
Standardort
Für die meisten gcloud composer
-Befehle ist ein Speicherort erforderlich. Sie können den Speicherort mit dem Flag --location
oder durch Festlegen des Standardspeicherorts angeben.
Beispielsweise haben Sie die Möglichkeit, mit dem folgenden Befehl einen DAG namens sample_quickstart
mit der ID 5077
in Ihrer Cloud Composer-Umgebung auszulösen:
gcloud composer environments run example-environment \
--location us-central1 dags trigger -- sample_quickstart \
--run-id=5077
Befehle in einer privaten IP-Umgebung ausführen
In Cloud Composer-Versionen vor 2.4.0:
Zum Ausführen von Befehlen der Airflow-Befehlszeile in einer privaten IP-Umgebung müssen Sie sie auf einem Computer aufrufen, der Zugriff auf den Endpunkt der Steuerungsebene des GKE-Cluster hat. Die Optionen variieren dabei abhängig von Ihrer privaten Clusterkonfiguration.
Wenn der Zugriff auf öffentliche Endpunkte im Cluster Ihrer Umgebung deaktiviert ist, können Sie die Airflow-Befehlszeile nicht mit gcloud composer
-Befehlen ausführen.
Führen Sie die folgenden Schritte aus, um Airflow-Befehle in der Befehlszeile ausführen zu können:
- VM in Ihrem VPC-Netzwerk erstellen
Rufen Sie die Clusteranmeldedaten ab. Führen Sie dazu diesen Befehl aus:
gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
Verwenden Sie kubectl
, um den Airflow-Befehl auszuführen. Beispiel:
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
--container airflow-scheduler -- airflow dags list
Ersetzen Sie COMPOSER_NAMESPACE
durch einen Namespace wie composer-2-0-28-airflow-2-3-394zxc12411
. Sie finden Ihre Cloud Composer-Anwendung in der Liste der Arbeitslasten oder mit dem Befehl kubectl get namespaces
.
Wenn der Zugriff auf öffentliche Endpunkte im Cluster Ihrer Umgebung aktiviert ist, können Sie Airflow-Befehle auch von einem Computer mit einer externen IP-Adresse ausführen, die zu autorisierten Netzwerken hinzugefügt wird. Wenn Sie den Zugriff von Ihrem Computer aus aktivieren möchten, fügen Sie die externe Adresse Ihres Computers der Liste der autorisierten Netzwerke Ihrer Umgebung hinzu.
Wenn Sie gcloud composer environments run
- oder kubectl
-Befehle ausführen, kann der folgende Fehler auftreten:
Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"
Symptom: Diese Fehlermeldung gibt an, dass von einem Computer, auf dem Sie diese Befehle ausführen, keine Netzwerkverbindung besteht.
Lösung: Folgen Sie der Anleitung im Abschnitt Befehle in einer privaten IP-Umgebung ausführen oder verwenden Sie die Anleitung im Abschnitt kubectl
-Befehl läuft ab.
Airflow-Befehle über die Cloud Composer API ausführen
Ab Cloud Composer-Version 2.4.0 können Sie Airflow-Befehlszeilen über die Cloud Composer API ausführen.
Befehl ausführen
Erstellen Sie eine environments.executeAirflowCommand
-API-Anfrage:
{
"environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
"command": "AIRFLOW_COMMAND",
"subcommand": "AIRFLOW_SUBCOMMAND",
"parameters": [
"SUBCOMMAND_PARAMETER"
]
}
Ersetzen Sie Folgendes:
PROJECT_ID
: die Projekt-ID.LOCATION
: die Region, in der sich die Umgebung befindet.ENVIRONMENT_NAME
: der Name Ihrer UmgebungAIRFLOW_COMMAND
: Airflow-Befehlszeilenbefehl, den Sie ausführen möchten, z. B.dags
.AIRFLOW_SUBCOMMAND
: Unterbefehl für den Airflow-Befehlszeilenbefehl, den Sie ausführen möchten, z. B.list
.- Optional:
SUBCOMMAND_PARAMETER
: Parameter für den untergeordneten Befehl. Wenn Sie mehrere Parameter verwenden möchten, fügen Sie der Liste weitere Elemente hinzu.
Beispiel:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
"environment": "projects/example-project/locations/us-central1/environments/example-environment",
"command": "dags",
"subcommand": "list",
"parameters": [
"-o json",
"--verbose"
]
}
Status des Abfragebefehls
Nachdem Sie einen Airflow-Befehl über die Cloud Composer API ausgeführt haben, prüfen Sie, ob der Befehl erfolgreich abgeschlossen wurde. Stellen Sie dazu eine PollAirflowCommand-Anfrage und prüfen Sie die Felder in exitInfo
auf Fehler und Statuscodes. Das Feld output
enthält Logzeilen.
Wenn Sie den Status der Befehlsausführung abrufen und Protokolle abrufen möchten, geben Sie die von ExecuteAirflowCommandRequest
zurückgegebenen Werte executionId
, pod
und podNamespace
an:
Beispiel:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
"executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
"pod": "airflow-scheduler-1327d8cd68-hblpd",
"podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
"nextLineNumber": 1
}