編寫 Airflow DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本指南說明如何編寫在 Cloud Composer 環境中執行的 Apache Airflow 有向非循環圖 (DAG)。

Apache Airflow 不提供嚴密的 DAG 和工作隔離機制,因此建議您使用個別的實際工作環境和測試環境,以免發生 DAG 干擾情形。詳情請參閱測試 DAG 一文。

建構 Airflow DAG

Airflow DAG 是在 Python 檔案中定義,並且是由下列元件組成:

  • DAG 定義
  • Airflow 運算子
  • 運算子關係

下列程式碼片段會顯示去脈絡化的各元件範例。

DAG 定義

以下範例說明 Airflow DAG 定義:

Airflow 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

運算子和工作

Airflow 運算子會描述待完成的工作。工作task 是運算子的特定例項。

Airflow 2

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

Airflow 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

工作關係

工作關係:描述工作必須完成的順序。

Airflow 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Airflow 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Python 中的完整 DAG 工作流程範例

下列工作流程是完整的有效 DAG 範本,其中包含兩項工作:hello_python 工作和 goodbye_bash 工作:

Airflow 2


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

如要進一步瞭解如何定義 Airflow DAG,請參閱 Airflow 教學課程Airflow 概念

Airflow 運算子

以下是幾個常用 Airflow 運算子的範例。如需 Airflow 運算子的權威性參考資料,請參閱「運算子和鉤子參考資料」和「供應者索引」。

BashOperator

使用 BashOperator 執行指令列程式。

Airflow 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Airflow 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer 會在 Airflow 工作站上執行以 Bash 指令碼提供的指令。工作站是 Debian 式的 Docker 容器,並包含數個套件。

PythonOperator

您可以使用 PythonOperator 執行任意 Python 程式碼。

Cloud Composer 會在包含環境中使用的 Cloud Composer 映像檔版本套件的容器中執行 Python 程式碼。

如要安裝其他 Python 套件,請參閱「安裝 Python 依附元件」一文。

Google Cloud 運算子

如要執行使用 Google Cloud 產品的工作,請使用Google Cloud Airflow 運算子。舉例來說,BigQuery 運算子會在 BigQuery 中查詢及處理資料。

Google Cloud 和 Google Cloud提供的個別服務還有許多其他 Airflow 運算子。如需完整清單,請參閱「Google Cloud 運算子」。

Airflow 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Airflow 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

您可以使用 EmailOperator 從 DAG 傳送電子郵件。如要從 Cloud Composer 環境傳送電子郵件,請將環境設定為使用 SendGrid

Airflow 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Airflow 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

作業員失敗通知

email_on_failure 設為 True,即可在 DAG 中的運算子失敗時傳送電子郵件通知。如要從 Cloud Composer 環境傳送電子郵件通知,您必須將環境設定為使用 SendGrid

Airflow 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Airflow 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

DAG 工作流程指南

  • 請將任何自訂 Python 程式庫放到巢狀目錄的 DAG ZIP 封存檔中,而不要放到 DAG 目錄頂層。

    當 Airflow 掃描 dags/ 資料夾時,Airflow 只會檢查 DAG 資料夾頂層和同樣位在頂層 dags/ 資料夾中的 ZIP 封存檔頂層,看看當中的 Python 模組是否包含 DAG。如果 Airflow 碰到的 Python 模組所屬的 ZIP 封存檔不包含 airflowDAG 子字串,Airflow 就會停止處理該 ZIP 封存檔。Airflow 只會傳回停止處理為止所找到的 DAG。

  • 使用 Airflow 2 而非 Airflow 1。

    Airflow 社群不再為 Airflow 1 發布新的次要版本或修補程式版本。

  • 為了容錯,請勿在相同 Python 模組中定義多個 DAG 物件。

  • 請勿使用子 DAG。請改為在 DAG 中分組工作

  • 請將剖析 DAG 時需要的檔案放在 dags/ 資料夾,而非 data/ 資料夾。

  • 為 DAG 實作單元測試

  • 按照測試 DAG 的操作說明所述,測試開發或修改的 DAG。

  • 確認開發的 DAG 不會過度增加DAG 剖析時間

  • Airflow 工作可能會因多種原因而失敗。為避免整個 DAG 執行作業失敗,建議您啟用工作重試功能。將重試次數上限設為 0 表示不會執行重試。

    建議您覆寫 default_task_retries 選項,使用 0 以外的值來重試工作。此外,您也可以在工作層級設定 retries 參數

  • 如果您想在 Airflow 工作中使用 GPU,請根據使用 GPU 的機器建立個別的 GKE 叢集。使用 GKEStartPodOperator 執行工作。

  • 請勿在叢集的節點集區中執行 CPU 和記憶體密集的作業,因為該集區會執行其他 Airflow 元件 (排程器、工作站、網路伺服器)。請改用 KubernetesPodOperatorGKEStartPodOperator

  • 將 DAG 部署至環境時,請只將解讀及執行 DAG 所需的檔案上傳至 /dags 資料夾。

  • 限制 /dags 資料夾中的 DAG 檔案數量。

    Airflow 會持續剖析 /dags 資料夾中的 DAG。剖析是一種循環處理 DAG 資料夾的程序,而需要載入的檔案數量 (連同其依附元件) 會影響 DAG 剖析和任務排程的效能。使用 100 個檔案 (每個檔案含 100 個 DAG) 比使用 10000 個檔案 (每個檔案含 1 個 DAG) 更有效率,因此建議進行這類最佳化。這項最佳化功能可在解析時間與 DAG 撰寫和管理效率之間取得平衡。

    例如,如果要部署 10000 個 DAG 檔案,您可以考慮建立 100 個 ZIP 檔案,每個檔案包含 100 個 DAG 檔案。

    除了上述提示之外,如果您有超過 10000 個 DAG 檔案,建議以程式輔助方式產生 DAG。舉例來說,您可以實作單一 Python DAG 檔案,產生一定數量的 DAG 物件 (例如 20、100 個 DAG 物件)。

  • 請避免使用已淘汰的 Airflow 運算子。請改為使用最新的替代方案

編寫 DAG 的常見問題

如要在多個 DAG 中執行相同或類似的工作,我該如何降低程式碼重複的情形?

建議您定義程式庫和包裝函式,以減少程式碼重複的情形。

如何在不同 DAG 檔案之間重複使用程式碼?

請將您的公用程式函式放到本機 Python 程式庫中並匯入函式。您可以在環境儲存桶的 dags/ 資料夾中,參照任何 DAG 中的函式。

如何降低產生不同定義的風險?

舉例來說,假設您有兩個團隊要將原始資料匯總成收益指標。這兩個團隊編寫了兩個稍有差異但用途相同的工作。您可以定義程式庫來處理收益資料,這樣 DAG 實作者就必須提供明確的匯總收益定義。

如何設定 DAG 之間的相依性?

這取決於您要如何定義相依性。

如果您有兩個 DAG (DAG A 和 DAG B),並想讓 DAG B 在 DAG A 之後觸發,您可以在 DAG A 的結尾加上 TriggerDagRunOperator

如果 DAG B 只依賴 DAG A 產生的成果 (例如 Pub/Sub 訊息),則或許較適合使用感應器。

如果 DAG B 與 DAG A 緊密整合,您或許能夠將這兩個 DAG 合併為單一 DAG。

如何將專屬執行 ID 傳送至 DAG 及其工作?

舉例來說,假設您要傳送 Dataproc 叢集名稱和檔案路徑。

您可以在 PythonOperator 中傳回 str(uuid.uuid4()),藉此產生隨機專屬 ID。這樣做會將 ID 加到 XComs 中,讓您可在其他運算子中透過範本欄位參照這個 ID。

產生 uuid 之前,請想想看 DagRun 專屬 ID 是否更有幫助。您也可以使用巨集在 Jinja 替換作業中參照這些 ID。

如何分隔 DAG 中的任務?

每項工作都應為整體作業的一部分,並具有冪等性質,因此請避免將多步驟工作流程封裝在單一工作中,例如在 PythonOperator 中執行的複雜程式。

我該在單一 DAG 中定義多項工作,以匯總來自多個來源的資料嗎?

舉例來說,假設您有多個包含原始資料的資料表,並想為每個資料表建立每日匯總數據。這些工作彼此不相關。您應該為每個資料表分別建立一項工作和 DAG 嗎?還是要建立一個通用的 DAG?

如果您允許每項工作共用相同的 DAG 層級屬性 (例如 schedule_interval),那麼在單一 DAG 中定義多項工作就很有意義。否則,為了盡量減少程式碼重複,您可以將多個 DAG 放入模組的 globals(),從單一 Python 模組產生多個 DAG。

如何限制 DAG 中執行的並行工作數量?

例如,您想避免超過 API 使用限制/配額,或避免同時執行太多個程序。

您可以在 Airflow 網頁 UI 中定義 Airflow 集區,並將工作與 DAG 中的現有集區建立關聯。

使用運算子的常見問題

我該使用 DockerOperator 嗎?

除非用於在遠端 Docker 安裝 (不在環境叢集中) 上啟動容器,否則不建議使用 DockerOperator。在 Cloud Composer 環境中,操作員無法存取 Docker 守護程序。

請改用 KubernetesPodOperatorGKEStartPodOperator。這些運算子會分別在 Kubernetes 或 GKE 叢集中啟動 Kubernetes Pod。請注意,我們不建議將 Pod 發布至環境的叢集,因為這可能會導致資源競爭。

我該使用 SubDagOperator 嗎?

我們不建議使用 SubDagOperator

請參考「分組工作」一節的建議,使用其他方法。

我應該只在 PythonOperators 中執行 Python 程式碼,以完全區隔 Python 運算子嗎?

視您的目標而定,您有幾種選項。

如果您只是要維持獨立的 Python 相依性,可以使用 PythonVirtualenvOperator

建議您使用 KubernetesPodOperator。這個運算子可讓您定義 Kubernetes pod,並在其他叢集中執行這些 pod。

如何新增自訂二進位檔或非 PyPI 套件?

您可以安裝私人套件存放區中託管的套件

如何將引數統一傳送至 DAG 及其工作?

您可以使用 Airflow 內建對 Jinja 範本的支援,傳遞可用於範本欄位的引數。

範本替換作業的發生時機為何?

呼叫運算子的 pre_execute 函式之前,系統會在 Airflow 工作站上替換範本。從實際執行的層面來看,即代表系統要到執行工作的前一刻才會替換範本。

如何確認哪些運算子引數支援範本替換作業?

運算子引數如果支援 Jinja2 範本替換作業,就會明確標示。

請查看運算子定義中的 template_fields 欄位,其中包含會經過範本替換作業的引數名稱清單。

例如,請查看 BashOperator,該運算子支援 bash_commandenv 引數的範本。

已淘汰及移除的 Airflow 運算子

下表列出的 Airflow 運算子已淘汰:

  • 避免在 DAG 中使用這些運算子。請改用提供的最新替換運算子。

  • 如果運算子顯示為可用,表示 Cloud Composer 的最新維護版本 (1.20.12) 仍提供這個運算子。

  • 所有版本的 Cloud Composer 1 都不支援部分替換運算子。如要使用這些功能,請考慮升級至 Cloud Composer 3 或 Cloud Composer 2。

已淘汰的運算子 狀態 替換運算子 換貨服務開放時間:
CreateAutoMLTextTrainingJobOperator 適用於 1.20.12 以上版本 SupervisedFineTuningTrainOperator 無法使用替換運算子
GKEDeploymentHook 適用於 1.20.12 以上版本 GKEKubernetesHook 無法使用替換運算子
GKECustomResourceHook 適用於 1.20.12 以上版本 GKEKubernetesHook 無法使用替換運算子
GKEPodHook 適用於 1.20.12 以上版本 GKEKubernetesHook 無法使用替換運算子
GKEJobHook 適用於 1.20.12 以上版本 GKEKubernetesHook 無法使用替換運算子
GKEPodAsyncHook 適用於 1.20.12 以上版本 GKEKubernetesAsyncHook 無法使用替換運算子
SecretsManagerHook 適用於 1.20.12 以上版本 GoogleCloudSecretManagerHook 無法使用替換運算子
BigQueryExecuteQueryOperator 適用於 1.20.12 以上版本 BigQueryInsertJobOperator 適用於 1.20.12 以上版本
BigQueryPatchDatasetOperator 適用於 1.20.12 以上版本 BigQueryUpdateDatasetOperator 適用於 1.20.12 以上版本
DataflowCreateJavaJobOperator 適用於 1.20.12 以上版本 beam.BeamRunJavaPipelineOperator 適用於 1.20.12 以上版本
DataflowCreatePythonJobOperator 適用於 1.20.12 以上版本 beam.BeamRunPythonPipelineOperator 適用於 1.20.12
DataprocSubmitPigJobOperator 適用於 1.20.12 以上版本 DataprocSubmitJobOperator 適用於 1.20.12 以上版本
DataprocSubmitHiveJobOperator 適用於 1.20.12 以上版本 DataprocSubmitJobOperator 適用於 1.20.12 以上版本
DataprocSubmitSparkSqlJobOperator 適用於 1.20.12 以上版本 DataprocSubmitJobOperator 適用於 1.20.12
DataprocSubmitSparkJobOperator 適用於 1.20.12 以上版本 DataprocSubmitJobOperator 適用於 1.20.12 以上版本
DataprocSubmitHadoopJobOperator 適用於 1.20.12 以上版本 DataprocSubmitJobOperator 適用於 1.20.12 以上版本
DataprocSubmitPySparkJobOperator 適用於 1.20.12 以上版本 DataprocSubmitJobOperator 適用於 1.20.12
BigQueryTableExistenceAsyncSensor 適用於 1.20.12 以上版本 BigQueryTableExistenceSensor 無法使用替換運算子
BigQueryTableExistencePartitionAsyncSensor 適用於 1.20.12 以上版本 BigQueryTablePartitionExistenceSensor 無法使用替換運算子
CloudComposerEnvironmentSensor 適用於 1.20.12 以上版本 CloudComposerCreateEnvironmentOperator、CloudComposerDeleteEnvironmentOperator、CloudComposerUpdateEnvironmentOperator 無法使用替換運算子
GCSObjectExistenceAsyncSensor 適用於 1.20.12 以上版本 GCSObjectExistenceSensor 無法使用替換運算子
GoogleAnalyticsHook 適用於 1.20.12 以上版本 GoogleAnalyticsAdminHook 無法使用替換運算子
GoogleAnalyticsListAccountsOperator 適用於 1.20.12 以上版本 GoogleAnalyticsAdminListAccountsOperator 無法使用替換運算子
GoogleAnalyticsGetAdsLinkOperator 適用於 1.20.12 以上版本 GoogleAnalyticsAdminGetGoogleAdsLinkOperator 無法使用替換運算子
GoogleAnalyticsRetrieveAdsLinksListOperator 適用於 1.20.12 以上版本 GoogleAnalyticsAdminListGoogleAdsLinksOperator 無法使用替換運算子
GoogleAnalyticsDataImportUploadOperator 適用於 1.20.12 以上版本 GoogleAnalyticsAdminCreateDataStreamOperator 無法使用替換運算子
GoogleAnalyticsDeletePreviousDataUploadsOperator 適用於 1.20.12 以上版本 GoogleAnalyticsAdminDeleteDataStreamOperator 無法使用替換運算子
DataPipelineHook 適用於 1.20.12 以上版本 DataflowHook 無法使用替換運算子
CreateDataPipelineOperator 適用於 1.20.12 以上版本 DataflowCreatePipelineOperator 無法使用替換運算子
RunDataPipelineOperator 適用於 1.20.12 以上版本 DataflowRunPipelineOperator 無法使用替換運算子
AutoMLDatasetLink 適用於 1.20.12 以上版本 TranslationLegacyDatasetLink 無法使用替換運算子
AutoMLDatasetListLink 適用於 1.20.12 以上版本 TranslationDatasetListLink 無法使用替換運算子
AutoMLModelLink 適用於 1.20.12 以上版本 TranslationLegacyModelLink 無法使用替換運算子
AutoMLModelTrainLink 適用於 1.20.12 以上版本 TranslationLegacyModelTrainLink 無法使用替換運算子
AutoMLModelPredictLink 適用於 1.20.12 以上版本 TranslationLegacyModelPredictLink 無法使用替換運算子
AutoMLBatchPredictOperator 適用於 1.20.12 以上版本 vertex_ai.batch_prediction_job 無法使用替換運算子
AutoMLPredictOperator 適用於 1.20.12 以上版本 vertex_aigenerative_model。TextGenerationModelPredictOperator、translate.TranslateTextOperator 無法使用替換運算子
PromptLanguageModelOperator 適用於 1.20.12 以上版本 TextGenerationModelPredictOperator 無法使用替換運算子
GenerateTextEmbeddingsOperator 適用於 1.20.12 以上版本 TextEmbeddingModelGetEmbeddingsOperator 無法使用替換運算子
PromptMultimodalModelOperator 適用於 1.20.12 以上版本 GenerativeModelGenerateContentOperator 無法使用替換運算子
PromptMultimodalModelWithMediaOperator 適用於 1.20.12 以上版本 GenerativeModelGenerateContentOperator 無法使用替換運算子
DataflowStartSqlJobOperator 適用於 1.20.12 以上版本 DataflowStartYamlJobOperator 無法使用替換運算子
LifeSciencesHook 適用於 1.20.12 以上版本 Google Cloud 批次運算子的掛鉤 尚未公布
DataprocScaleClusterOperator 適用於 1.20.12 以上版本 DataprocUpdateClusterOperator 尚未公布
MLEngineStartBatchPredictionJobOperator 適用於 1.20.12 以上版本 CreateBatchPredictionJobOperator 尚未公布
MLEngineManageModelOperator 適用於 1.20.12 以上版本 MLEngineCreateModelOperator、MLEngineGetModelOperator 尚未公布
MLEngineGetModelOperator 適用於 1.20.12 以上版本 GetModelOperator 尚未公布
MLEngineDeleteModelOperator 適用於 1.20.12 以上版本 DeleteModelOperator 尚未公布
MLEngineManageVersionOperator 適用於 1.20.12 以上版本 MLEngineCreateVersion、MLEngineSetDefaultVersion、MLEngineListVersions、MLEngineDeleteVersion 尚未公布
MLEngineCreateVersionOperator 適用於 1.20.12 以上版本 Vertex AI 運算子的 parent_model 參數 尚未公布
MLEngineSetDefaultVersionOperator 適用於 1.20.12 以上版本 SetDefaultVersionOnModelOperator 尚未公布
MLEngineListVersionsOperator 適用於 1.20.12 以上版本 ListModelVersionsOperator 尚未公布
MLEngineDeleteVersionOperator 適用於 1.20.12 以上版本 DeleteModelVersionOperator 尚未公布
MLEngineStartTrainingJobOperator 適用於 1.20.12 以上版本 CreateCustomPythonPackageTrainingJobOperator 尚未公布
MLEngineTrainingCancelJobOperator 適用於 1.20.12 以上版本 CancelCustomTrainingJobOperator 尚未公布
LifeSciencesRunPipelineOperator 適用於 1.20.12 以上版本 Google Cloud Batch 運算子 尚未公布
MLEngineCreateModelOperator 適用於 1.20.12 以上版本 對應的 VertexAI 運算子 尚未公布

後續步驟