Airflow-DAGs schreiben

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

In dieser Anleitung erfahren Sie, wie Sie einen gerichteten azyklischen Graphen (Directed Acyclic Graph, DAG) in Apache Airflow für die Ausführung in einer Cloud Composer-Umgebung schreiben.

Da Apache Airflow keine strikte DAG- und Aufgabenisolation bietet, empfehlen wir, separate Produktions- und Testumgebungen einzurichten, um DAG-Interferenzen zu vermeiden. Weitere Informationen finden Sie unter DAGs testen.

Airflow-DAG strukturieren

Ein Airflow-DAG wird in einer Python-Datei definiert und besteht aus den folgenden Komponenten:

  • DAG-Definition
  • Airflow-Operatoren
  • Operatorbeziehungen

Die folgenden Code-Snippets zeigen Beispiele für die einzelnen Komponenten ohne Kontext.

Eine DAG-Definition

Das folgende Beispiel zeigt eine Airflow-DAG-Definition:

Airflow 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Operatoren und Aufgaben

Airflow-Operatoren beschreiben die auszuführende Arbeit. Eine Aufgabe ist eine bestimmte Instanz eines Operators.

Airflow 2

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

Airflow 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Aufgabenbeziehungen

Aufgabenbeziehungen beschreiben die Reihenfolge, in der die Arbeit erledigt werden muss.

Airflow 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Airflow 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Vollständiges Beispiel für einen DAG-Workflow in Python

Der folgende Workflow ist eine vollständige Arbeits-DAG-Vorlage, die aus zwei Aufgaben besteht: hello_python und goodbye_bash:

Airflow 2


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Weitere Informationen zum Definieren von Airflow-DAGs finden Sie in der Airflow-Anleitung und in den Airflow-Konzepten.

Airflow-Operatoren

Die folgenden Beispiele enthalten einige beliebte Airflow-Operatoren. Eine verbindliche Referenz zu den Airflow-Operatoren finden Sie in der Referenz zu Operatoren und Hooks und im Anbieterindex.

BashOperator

Mit dem BashOperator können Sie Befehlszeilenprogramme ausführen.

Airflow 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Airflow 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer führt die bereitgestellten Befehle in einem Bash-Skript auf einem Airflow-Worker aus. Der Worker ist ein Docker-Container auf Basis von Debian und enthält mehrere Pakete.

PythonOperator

Verwenden Sie den PythonOperator, um beliebigen Python-Code auszuführen.

Cloud Composer führt den Python-Code in einem Container aus, der Pakete für die in Ihrer Umgebung verwendete Cloud Composer-Image-Version enthält.

Informationen zum Installieren weiterer Python-Pakete finden Sie unter Python-Abhängigkeiten installieren.

Google Cloud Operatoren

Verwenden Sie dieGoogle Cloud Airflow-Operatoren, um Aufgaben auszuführen, die Google Cloud -Produkte verwenden. BigQuery-Operatoren fragen Daten in BigQuery ab und verarbeiten sie.

Es gibt noch viele weitere Airflow-Operatoren für Google Cloud und einzelne Dienste von Google Cloud. Eine vollständige Liste finden Sie unter Google Cloud Operatoren.

Airflow 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Airflow 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

Verwenden Sie den EmailOperator, um E-Mails von einem DAG zu senden. Wenn Sie E-Mails aus einer Cloud Composer-Umgebung senden möchten, müssen Sie die Verwendung von SendGrid in der Umgebung konfigurieren.

Airflow 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Airflow 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Benachrichtigungen bei Fehlern des Mobilfunkanbieters

Zum Senden einer E-Mail-Benachrichtigung, wenn ein Operator im DAG fehlerhaft ist, legen Sie für email_on_failure den Wert True fest. Zum Senden von E-Mails aus einer Cloud Composer-Umgebung müssen Sie Ihre Umgebung für die Verwendung von SendGrid konfigurieren.

Airflow 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Airflow 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Richtlinien für DAG-Workflows

  • Platzieren Sie benutzerdefinierte Python-Bibliotheken im ZIP-Archiv eines DAG in einem verschachtelten Verzeichnis. Platzieren Sie Bibliotheken nicht auf der obersten Ebene des DAG-Verzeichnisses.

    Wenn Airflow den Ordner dags/ durchsucht, prüft Airflow nur auf DAGs in Python-Modulen, die sich auf der obersten Ebene des Ordners „DAGs“ und auf der obersten Ebene eines ZIP-Archivs befinden, das sich ebenfalls im Ordner dags/ auf oberster Ebene befindet. Wenn Airflow in einem ZIP-Archiv ein Python-Modul ermittelt, das weder airflow- noch DAG-Teilstrings enthält, beendet Airflow die Verarbeitung des ZIP-Archivs. Airflow gibt nur die DAGs zurück, die bis zu diesem Zeitpunkt gefunden wurden.

  • Verwenden Sie Airflow 2 anstelle von Airflow 1.

    Die Airflow-Community veröffentlicht keine neuen Neben- oder Patchreleases mehr für Airflow 1.

  • Achten Sie aus Gründen der Fehlertoleranz darauf, nicht mehrere DAG-Objekte im gleichen Python-Modul zu definieren.

  • Verwenden Sie keine SubDAGs. Stattdessen sollten Sie Aufgaben in DAGs gruppieren.

  • Platzieren Sie Dateien, die zum Zeitpunkt des DAG-Parsens erforderlich sind, im Ordner dags/ und nicht im Ordner data/.

  • Einheitentests für Ihre DAGs implementieren

  • Testen Sie entwickelte oder geänderte DAGs wie in der Anleitung zum Testen von DAGs empfohlen.

  • Prüfen Sie, ob die entwickelten DAGs die DAG-Parsing-Zeiten zu stark erhöhen.

  • Airflow-Aufgaben können aus verschiedenen Gründen fehlschlagen. Um Fehler bei gesamten DAG-Ausführungen zu vermeiden, empfehlen wir, Wiederholungsversuche für Aufgaben zu aktivieren. Wenn Sie die maximale Anzahl von Wiederholungsversuchen auf 0 festlegen, werden keine Wiederholungsversuche ausgeführt.

    Wir empfehlen, die Option default_task_retries mit einem anderen Wert als 0 für die Wiederholungsversuche der Aufgabe zu überschreiben. Außerdem können Sie den Parameter retries auf Aufgabenebene festlegen.

  • Wenn Sie GPUs in Ihren Airflow-Aufgaben verwenden möchten, erstellen Sie einen separaten GKE-Cluster, der auf Knoten mit Maschinen mit GPUs basiert. Verwenden Sie GKEStartPodOperator, um Ihre Aufgaben auszuführen.

  • Vermeiden Sie die Ausführung von CPU- und arbeitsspeicherintensiven Aufgaben im Knotenpool des Clusters, in dem andere Airflow-Komponenten (Planer, Worker, Webserver) ausgeführt werden. Verwenden Sie stattdessen KubernetesPodOperator oder GKEStartPodOperator.

  • Wenn Sie DAGs in einer Umgebung bereitstellen, laden Sie nur die Dateien, die zum Interpretieren und Ausführen von DAGs unbedingt erforderlich sind, in den Ordner /dags hoch.

  • Beschränken Sie die Anzahl der DAG-Dateien im Ordner /dags.

    Airflow parst kontinuierlich DAGs im Ordner /dags. Beim Parsing wird der DAG-Ordner durchlaufen. Die Anzahl der Dateien, die geladen werden müssen (mit ihren Abhängigkeiten), wirkt sich auf die Leistung des DAG-Parsings und der Aufgabenplanung aus. Es ist viel effizienter, 100 Dateien mit jeweils 100 DAGs zu verwenden als 10.000 Dateien mit jeweils einem DAG. Eine solche Optimierung wird daher empfohlen. Diese Optimierung ist ein Kompromiss zwischen Parsing-Zeit und Effizienz der DAG-Erstellung und ‑Verwaltung.

    Wenn Sie beispielsweise 10.000 DAG-Dateien bereitstellen möchten, können Sie 100 ZIP-Dateien erstellen, die jeweils 100 DAG-Dateien enthalten.

    Wenn Sie mehr als 10.000 DAG-Dateien haben, kann es sinnvoll sein, DAGs programmatisch zu generieren. Sie können beispielsweise eine einzelne Python-DAG-Datei implementieren, die eine bestimmte Anzahl von DAG-Objekten generiert (z. B. 20 oder 100).

  • Verwenden Sie keine veralteten Airflow-Operatoren. Verwenden Sie stattdessen die aktuellen Alternativen.

FAQs zum Schreiben von DAGs

Wie minimiere ich Codewiederholungen, wenn ich die gleichen oder ähnliche Aufgaben in mehreren DAGs ausführen möchte?

Wir empfehlen das Definieren von Bibliotheken und Wrappern, um Codewiederholungen zu reduzieren.

Wie kann ich Code in mehreren DAG-Dateien wiederverwenden?

Binden Sie Hilfsfunktionen in eine lokale Python-Bibliothek ein und importieren Sie die Funktionen. Sie können in allen DAGs, die sich im dags/-Ordner Ihres Buckets befinden, auf die Funktionen verweisen.

Wie minimiere ich das Risiko unterschiedlicher Definitionen?

Angenommen, es gibt zwei Teams, die Rohdaten zu Umsatzkennzahlen zusammenfassen möchten. Die Teams schreiben zwei geringfügig unterschiedliche Aufgaben für den gleichen Sachverhalt. Definieren Sie Bibliotheken für die Arbeit mit den Umsatzdaten, sodass diejenigen, die DAGs implementieren, die Definition des zusammengefassten Umsatzes eindeutig festlegen müssen.

Wie lege ich Abhängigkeiten zwischen DAGs fest?

Das hängt davon ab, wie Sie die Abhängigkeit definieren möchten.

Wenn Sie zwei DAGs haben (DAG A und DAG B) und DAG B nach DAG A ausgelöst werden soll, können Sie einen TriggerDagRunOperator am Ende von DAG A platzieren.

Wenn DAG B nur von einem von DAG A generierten Artefakt abhängt (z. B. eine Pub/Sub-Meldung), ist ein Sensor möglicherweise besser geeignet.

Wenn DAG B eng mit DAG A integriert ist, können Sie die beiden DAGs möglicherweise in einen DAG zusammenführen.

Wie übergebe ich eindeutige Ausführungs-IDs an einen DAG und die zugehörigen Aufgaben?

Angenommen, es sollen Dataproc-Clusternamen und -Dateipfade übergeben werden.

In diesem Fall können Sie eine zufällige eindeutige ID generieren und dafür str(uuid.uuid4()) in einem PythonOperator zurückgeben. Dadurch wird die ID in XComs abgelegt, sodass Sie in anderen Operatoren über Vorlagenfelder darauf verweisen können.

Prüfen Sie vor dem Generieren einer uuid, ob eine DagRun-spezifische ID sinnvoller wäre. Sie können auf diese IDs in Jinja-Substitutionen auch mit Makros verweisen.

Wie trenne ich Aufgaben in einem DAG?

Eine Aufgabe sollte eine idempotente Arbeitseinheit sein. Vermeiden Sie es deshalb, einen aus mehreren Schritten bestehenden Workflow in eine einzelne Aufgabe aufzunehmen, z. B. in ein komplexes Programm, das in einem PythonOperator ausgeführt wird.

Soll ich mehrere Aufgaben in einem einzelnen DAG definieren, um Daten aus mehreren Quellen zusammenzufassen?

Angenommen, ich habe mehrere Tabellen mit Rohdaten und möchte tägliche Zusammenfassungen für jede einzelne Tabelle erstellen. Die Aufgaben sind nicht voneinander abhängig. Soll ich eine Aufgabe und einen DAG für jede Tabelle oder einen allgemeinen DAG erstellen?

Wenn es für Sie kein Problem ist, dass jede Aufgabe die gleichen Attribute auf DAG-Ebene verwendet (z. B. schedule_interval), ist es sinnvoll, mehrere Aufgaben in einem einzigen DAG zu definieren. Andernfalls können zur Minimierung der Codewiederholung mehrere DAGs aus einem einzigen Python-Modul generiert werden. Dazu platzieren Sie diese in den globalen globals() des Moduls.

Wie beschränke ich die Anzahl gleichzeitiger Aufgaben, die in einem DAG ausgeführt werden?

Ich möchte z. B. vermeiden, dass API-Nutzungslimits und -kontingente überschritten oder zu viele Prozesse gleichzeitig ausgeführt werden.

Sie können dazu Airflow-Pools in der Airflow-Weboberfläche definieren und in Ihren DAGs Aufgaben mit vorhandenen Pools verknüpfen.

FAQs zur Verwendung von Operatoren

Soll ich den DockerOperator verwenden?

Wir raten von der Verwendung des DockerOperator ab, es sei denn, er wird zum Starten von Containern in einer Remote-Docker-Installation verwendet (nicht im Cluster einer Umgebung). In einer Cloud Composer-Umgebung hat der Operator keinen Zugriff auf Docker-Daemons.

Verwenden Sie stattdessen KubernetesPodOperator oder GKEStartPodOperator. Diese Operatoren starten Kubernetes-Pods in Kubernetes- bzw. GKE-Clustern. Es ist nicht empfehlenswert, Pods im Cluster einer Umgebung zu starten, da dies zu Konkurrenz um Ressourcen führen kann.

Soll ich den SubDagOperator verwenden?

Die Verwendung von SubDagOperator wird nicht empfohlen.

Verwenden Sie Alternativen, wie unter Aufgaben gruppieren beschrieben.

Soll ich Python-Code nur in PythonOperators ausführen, um Python-Operatoren vollständig zu trennen?

Abhängig von Ihrem Ziel haben Sie mehrere Optionen.

Falls Ihr einziges Ziel ist, separate Python-Abhängigkeiten beizubehalten, können Sie PythonVirtualenvOperator verwenden.

Erwägen Sie, den KubernetesPodOperator zu verwenden. Mit diesem Operator können Sie Kubernetes-Pods definieren und die Pods in anderen Clustern ausführen.

Wie füge ich benutzerdefinierte binäre oder Nicht-PyPI-Pakete hinzu?

Sie können dazu Pakete installieren, die in privaten Paket-Repositories gehostet werden,

Wie übergebe ich Argumente einheitlich an einen DAG und die zugehörigen Aufgaben?

Sie können die integrierte Airflow-Unterstützung für Jinja-Vorlagen nutzen, um Argumente zu übergeben, die in Vorlagenfeldern verwendet werden können.

Wann findet die Vorlagenersetzung statt?

Die Vorlagen werden auf den Airflow-Workern unmittelbar vor dem Aufruf der pre_execute-Funktion eines Operators ersetzt. In der Praxis bedeutet dies, dass Vorlagen erst unmittelbar vor der Ausführung einer Aufgabe ersetzt werden.

Wie kann ich erkennen, welche Operatorargumente die Vorlagenersetzung unterstützen?

Operatorargumente, die die Jinja2-Vorlagenersetzung unterstützen, sind explizit entsprechend gekennzeichnet.

Suchen Sie in der Operatordefinition nach dem Feld template_fields. Es enthält eine Liste der Argumentnamen, für die die Vorlagenersetzung verwendet wird.

Dazu gehört beispielsweise der BashOperator, mit dem Vorlagen für die Argumente bash_command und env unterstützt werden.

Eingestellte und entfernte Airflow-Operatoren

Die in der folgenden Tabelle aufgeführten Airflow-Operatoren sind veraltet:

  • Vermeiden Sie die Verwendung dieser Operatoren in Ihren DAGs. Verwenden Sie stattdessen die bereitgestellten aktuellen Ersatzoperatoren.

  • Wenn ein Operator als verfügbar aufgeführt ist, bedeutet das, dass er in der neuesten Wartungsversion von Cloud Composer (1.20.12) weiterhin verfügbar ist.

  • Einige der Ersatzoperatoren werden in keiner Version von Cloud Composer 1 unterstützt. Wenn Sie sie verwenden möchten, sollten Sie ein Upgrade auf Cloud Composer 3 oder Cloud Composer 2 in Betracht ziehen.

Veralteter Operator Status Ersatzoperator Ersatzgerät verfügbar ab
CreateAutoMLTextTrainingJobOperator Verfügbar in Version 1.20.12 SupervisedFineTuningTrainOperator Ersatzoperator nicht verfügbar
GKEDeploymentHook Verfügbar in Version 1.20.12 GKEKubernetesHook Ersatzoperator nicht verfügbar
GKECustomResourceHook Verfügbar in Version 1.20.12 GKEKubernetesHook Ersatzoperator nicht verfügbar
GKEPodHook Verfügbar in Version 1.20.12 GKEKubernetesHook Ersatzoperator nicht verfügbar
GKEJobHook Verfügbar in Version 1.20.12 GKEKubernetesHook Ersatzoperator nicht verfügbar
GKEPodAsyncHook Verfügbar in Version 1.20.12 GKEKubernetesAsyncHook Ersatzoperator nicht verfügbar
SecretsManagerHook Verfügbar in Version 1.20.12 GoogleCloudSecretManagerHook Ersatzoperator nicht verfügbar
BigQueryExecuteQueryOperator Verfügbar in Version 1.20.12 BigQueryInsertJobOperator Verfügbar in Version 1.20.12
BigQueryPatchDatasetOperator Verfügbar in Version 1.20.12 BigQueryUpdateDatasetOperator Verfügbar in Version 1.20.12
DataflowCreateJavaJobOperator Verfügbar in Version 1.20.12 beam.BeamRunJavaPipelineOperator Verfügbar in Version 1.20.12
DataflowCreatePythonJobOperator Verfügbar in Version 1.20.12 beam.BeamRunPythonPipelineOperator Verfügbar in Version 1.20.12
DataprocSubmitPigJobOperator Verfügbar in Version 1.20.12 DataprocSubmitJobOperator Verfügbar in Version 1.20.12
DataprocSubmitHiveJobOperator Verfügbar in Version 1.20.12 DataprocSubmitJobOperator Verfügbar in Version 1.20.12
DataprocSubmitSparkSqlJobOperator Verfügbar in Version 1.20.12 DataprocSubmitJobOperator Verfügbar in Version 1.20.12
DataprocSubmitSparkJobOperator Verfügbar in Version 1.20.12 DataprocSubmitJobOperator Verfügbar in Version 1.20.12
DataprocSubmitHadoopJobOperator Verfügbar in Version 1.20.12 DataprocSubmitJobOperator Verfügbar in Version 1.20.12
DataprocSubmitPySparkJobOperator Verfügbar in Version 1.20.12 DataprocSubmitJobOperator Verfügbar in Version 1.20.12
BigQueryTableExistenceAsyncSensor Verfügbar in Version 1.20.12 BigQueryTableExistenceSensor Ersatzoperator nicht verfügbar
BigQueryTableExistencePartitionAsyncSensor Verfügbar in Version 1.20.12 BigQueryTablePartitionExistenceSensor Ersatzoperator nicht verfügbar
CloudComposerEnvironmentSensor Verfügbar in Version 1.20.12 CloudComposerCreateEnvironmentOperator, CloudComposerDeleteEnvironmentOperator, CloudComposerUpdateEnvironmentOperator Ersatzoperator nicht verfügbar
GCSObjectExistenceAsyncSensor Verfügbar in Version 1.20.12 GCSObjectExistenceSensor Ersatzoperator nicht verfügbar
GoogleAnalyticsHook Verfügbar in Version 1.20.12 GoogleAnalyticsAdminHook Ersatzoperator nicht verfügbar
GoogleAnalyticsListAccountsOperator Verfügbar in Version 1.20.12 GoogleAnalyticsAdminListAccountsOperator Ersatzoperator nicht verfügbar
GoogleAnalyticsGetAdsLinkOperator Verfügbar in Version 1.20.12 GoogleAnalyticsAdminGetGoogleAdsLinkOperator Ersatzoperator nicht verfügbar
GoogleAnalyticsRetrieveAdsLinksListOperator Verfügbar in Version 1.20.12 GoogleAnalyticsAdminListGoogleAdsLinksOperator Ersatzoperator nicht verfügbar
GoogleAnalyticsDataImportUploadOperator Verfügbar in Version 1.20.12 GoogleAnalyticsAdminCreateDataStreamOperator Ersatzoperator nicht verfügbar
GoogleAnalyticsDeletePreviousDataUploadsOperator Verfügbar in Version 1.20.12 GoogleAnalyticsAdminDeleteDataStreamOperator Ersatzoperator nicht verfügbar
DataPipelineHook Verfügbar in Version 1.20.12 DataflowHook Ersatzoperator nicht verfügbar
CreateDataPipelineOperator Verfügbar in Version 1.20.12 DataflowCreatePipelineOperator Ersatzoperator nicht verfügbar
RunDataPipelineOperator Verfügbar in Version 1.20.12 DataflowRunPipelineOperator Ersatzoperator nicht verfügbar
AutoMLDatasetLink Verfügbar in Version 1.20.12 TranslationLegacyDatasetLink Ersatzoperator nicht verfügbar
AutoMLDatasetListLink Verfügbar in Version 1.20.12 TranslationDatasetListLink Ersatzoperator nicht verfügbar
AutoMLModelLink Verfügbar in Version 1.20.12 TranslationLegacyModelLink Ersatzoperator nicht verfügbar
AutoMLModelTrainLink Verfügbar in Version 1.20.12 TranslationLegacyModelTrainLink Ersatzoperator nicht verfügbar
AutoMLModelPredictLink Verfügbar in Version 1.20.12 TranslationLegacyModelPredictLink Ersatzoperator nicht verfügbar
AutoMLBatchPredictOperator Verfügbar in Version 1.20.12 vertex_ai.batch_prediction_job Ersatzoperator nicht verfügbar
AutoMLPredictOperator Verfügbar in Version 1.20.12 vertex_aigenerative_model. TextGenerationModelPredictOperator, translate.TranslateTextOperator Ersatzoperator nicht verfügbar
PromptLanguageModelOperator Verfügbar in Version 1.20.12 TextGenerationModelPredictOperator Ersatzoperator nicht verfügbar
GenerateTextEmbeddingsOperator Verfügbar in Version 1.20.12 TextEmbeddingModelGetEmbeddingsOperator Ersatzoperator nicht verfügbar
PromptMultimodalModelOperator Verfügbar in Version 1.20.12 GenerativeModelGenerateContentOperator Ersatzoperator nicht verfügbar
PromptMultimodalModelWithMediaOperator Verfügbar in Version 1.20.12 GenerativeModelGenerateContentOperator Ersatzoperator nicht verfügbar
DataflowStartSqlJobOperator Verfügbar in Version 1.20.12 DataflowStartYamlJobOperator Ersatzoperator nicht verfügbar
LifeSciencesHook Verfügbar in Version 1.20.12 Hook für Google Cloud Batch-Operatoren Wird noch angekündigt
DataprocScaleClusterOperator Verfügbar in Version 1.20.12 DataprocUpdateClusterOperator Wird noch angekündigt
MLEngineStartBatchPredictionJobOperator Verfügbar in Version 1.20.12 CreateBatchPredictionJobOperator Wird noch angekündigt
MLEngineManageModelOperator Verfügbar in Version 1.20.12 MLEngineCreateModelOperator, MLEngineGetModelOperator Wird noch angekündigt
MLEngineGetModelOperator Verfügbar in Version 1.20.12 GetModelOperator Wird noch angekündigt
MLEngineDeleteModelOperator Verfügbar in Version 1.20.12 DeleteModelOperator Wird noch angekündigt
MLEngineManageVersionOperator Verfügbar in Version 1.20.12 MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion Wird noch angekündigt
MLEngineCreateVersionOperator Verfügbar in Version 1.20.12 Parameter „parent_model“ für Vertex AI-Operatoren Wird noch angekündigt
MLEngineSetDefaultVersionOperator Verfügbar in Version 1.20.12 SetDefaultVersionOnModelOperator Wird noch angekündigt
MLEngineListVersionsOperator Verfügbar in Version 1.20.12 ListModelVersionsOperator Wird noch angekündigt
MLEngineDeleteVersionOperator Verfügbar in Version 1.20.12 DeleteModelVersionOperator Wird noch angekündigt
MLEngineStartTrainingJobOperator Verfügbar in Version 1.20.12 CreateCustomPythonPackageTrainingJobOperator Wird noch angekündigt
MLEngineTrainingCancelJobOperator Verfügbar in Version 1.20.12 CancelCustomTrainingJobOperator Wird noch angekündigt
LifeSciencesRunPipelineOperator Verfügbar in Version 1.20.12 Google Cloud Batch-Operatoren Wird noch angekündigt
MLEngineCreateModelOperator Verfügbar in Version 1.20.12 entsprechender Vertex AI-Operator Wird noch angekündigt

Nächste Schritte