Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何使用 KubernetesPodOperator,將 Cloud Composer 中的 Kubernetes Pods 部署至 Google Kubernetes Engine 叢集。
KubernetesPodOperator 會在環境叢集中啟動 Kubernetes Pods。相比之下,Google Kubernetes Engine 操作員會在指定叢集中執行 Kubernetes Pod,該叢集可以是與環境無關的獨立叢集。您也可以使用 Google Kubernetes Engine 運算子建立及刪除叢集。
如果您需要下列功能,KubernetesPodOperator 會是理想選擇:
- 無法透過公開 PyPI 存放區取得的自訂 Python 依附元件。
- 在原始 Cloud Composer 工作站映像檔中無法使用的二進位依附元件。
事前準備
如果使用 CNCF Kubernetes Provider 的 5.0.0 版,請按照 CNCF Kubernetes Provider 專節中的操作說明進行。
Cloud Composer 2 不支援 Pod 關聯設定。如果您想使用 Pod 相依性,請改用 GKE 運算子,在其他叢集中啟動 Pod。
關於 Cloud Composer 2 中的 KubernetesPodOperator
本節說明 KubernetesPodOperator 在 Cloud Composer 2 中的運作方式。
資源使用情況
在 Cloud Composer 2 中,環境的叢集會自動調整資源配置。您使用 KubernetesPodOperator 執行的額外工作負載,會獨立於環境進行擴充。
環境不會受到資源需求增加的影響,但環境的叢集會根據資源需求進行升降。
在環境叢集中執行的額外工作負載,其定價會採用 Cloud Composer 2 定價模式,並使用 Cloud Composer 運算 SKU。
Cloud Composer 2 使用 Autopilot 叢集,其中引入了運算類別的概念:
Cloud Composer 僅支援
general-purpose
運算類別。根據預設,如果您未選取任何類別,則在使用 KubernetesPodOperator 建立 Pod 時,系統會假設為
general-purpose
類別。每個類別都與特定屬性和資源限制相關聯,您可以參閱 Autopilot 說明文件瞭解相關資訊。舉例來說,在
general-purpose
類別中執行的 Pod 最多可使用 110 GiB 記憶體。
存取專案資源
Cloud Composer 2 會使用 GKE 叢集,並搭配 Workload Identity Federation for GKE。在 composer-user-workloads
命名空間中執行的 Pod 可以存取專案中的 Google Cloud 資源,無須額外設定。環境的服務帳戶會用於存取這些資源。
如果您想使用自訂命名空間,則與此命名空間相關聯的 Kubernetes 服務帳戶必須對應至 Google Cloud 服務帳戶,才能為 Google API 和其他服務的要求啟用服務身分授權。如果您在環境叢集中的自訂命名空間中執行 Pod,則不會建立 Kubernetes 和Google Cloud 服務帳戶之間的 IAM 繫結,這些 Pod 也無法存取您的 Google Cloud 專案資源。
如果您使用自訂命名空間,且希望 Pod 能存取Google Cloud 資源,請按照「Workload Identity Federation for GKE」中的指示設定自訂命名空間的繫結:
- 在環境叢集中建立不同的命名空間。
- 在自訂命名空間 Kubernetes 服務帳戶和環境的服務帳戶之間建立繫結。
- 將環境的服務帳戶註解加入 Kubernetes 服務帳戶。
- 使用 KubernetesPodOperator 時,請在
namespace
和service_account_name
參數中指定命名空間和 Kubernetes 服務帳戶。
希望將設定需求減至最低
如要建立 KubernetesPodOperator,只需要 Pod 的 name
、要使用的 image
,以及 task_id
參數。/home/airflow/composer_kube_config
包含用於向 GKE 進行驗證的憑證。
額外設定
本例顯示可在 KubernetesPodOperator 中設定的其他參數。
詳情請參閱下列資源:
如要瞭解如何使用 Kubernetes 密鑰和 ConfigMap,請參閱「使用 Kubernetes 密鑰和 ConfigMap」。
如要進一步瞭解如何在 KubernetesPodOperator 中使用 Jinja 範本,請參閱「使用 Jinja 範本」。
如要瞭解 KubernetesPodOperator 參數,請參閱 Airflow 說明文件中的運算子參考資料。
使用 Jinja 範本
Airflow 支援 DAG 中的 Jinja 範本。
您必須使用運算子宣告必要的 Airflow 參數 (task_id
、name
和 image
)。如以下範例所示,您可以使用 Jinja 建立所有其他參數的範本,包括 cmds
、arguments
、env_vars
和 config_file
。
範例中的 env_vars
參數是從名為 my_value
的 Airflow 變數設定。範例 DAG 會從 Airflow 中的 vars
範本變數取得值。Airflow 提供更多變數,可存取不同類型的資訊。舉例來說,您可以使用 conf
範本變數存取 Airflow 設定選項的值。如需詳細資訊和 Airflow 中可用的變數清單,請參閱 Airflow 說明文件中的「範本參考資料」。
如果不變更 DAG 或建立 env_vars
變數,範例中的 ex-kube-templates
工作會失敗,因為變數不存在。您可以在 Airflow 使用者介面或 Google Cloud CLI 中建立這個變數:
Airflow UI
前往 Airflow UI。
在工具列中,依序選取「管理」>「變數」。
在「清單變數」頁面中,按一下「新增記錄」。
在「新增變數」頁面中輸入下列資訊:
- 鍵:
my_value
- Val:
example_value
- 鍵:
按一下 [儲存]。
gcloud
輸入下列指令:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
取代:
- 將
ENVIRONMENT
替換為環境的名稱。 LOCATION
改成環境所在的地區。
以下範例說明如何搭配 KubernetesPodOperator 使用 Jinja 範本:
使用 Kubernetes Secret 和 ConfigMap
Kubernetes Secret 是包含機密資料的物件。Kubernetes ConfigMap 是物件,可在鍵/值組合中包含非機密資料。
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI、API 或 Terraform 建立機密資料和 ConfigMap,然後透過 KubernetesPodOperator 存取。
關於 YAML 設定檔
使用 Google Cloud CLI 和 API 建立 Kubernetes 密鑰或 ConfigMap 時,您必須提供 YAML 格式的檔案。這個檔案必須遵循 Kubernetes 密鑰和 ConfigMap 使用的格式。Kubernetes 說明文件提供許多 ConfigMap 和 Secret 的程式碼範例。如要開始使用,請參閱「使用 Secret 安全地發布憑證」頁面和 ConfigMaps。
與 Kubernetes 密鑰相同,在密鑰中定義值時,請使用 Base64 表示法。
如要編碼值,您可以使用下列指令 (這是取得 Base64 編碼值的眾多方法之一):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
輸出:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
以下兩個 YAML 檔案範例會在本指南後續的範例中使用。Kubernetes 機密資料的 YAML 設定檔範例:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
以下是另一個示例,說明如何納入檔案。與前一個範例相同,請先對檔案內容進行編碼 (cat ./key.json | base64
),然後在 YAML 檔案中提供這個值:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
ConfigMap 的 YAML 設定檔範例。您不需要在 ConfigMap 中使用 base64 表示法:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
管理 Kubernetes 密鑰
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI 和 kubectl
建立機密:
取得環境叢集的相關資訊:
執行下列指令:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
取代:
ENVIRONMENT
改為環境名稱。LOCATION
與 Cloud Composer 環境所在的地區。
這個指令的輸出格式如下:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
。如要取得 GKE 叢集 ID,請複製
/clusters/
後方的輸出內容 (結尾為-gke
)。
使用下列指令連線至 GKE 叢集:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
更改下列內容:
CLUSTER_ID
:環境的叢集 ID。PROJECT_ID
:專案 ID。LOCATION
:環境所在的地區。
建立 Kubernetes 密鑰:
以下指令示範兩種建立 Kubernetes 機密值的方法。
--from-literal
方法會使用鍵/值組合。--from-file
方法會使用檔案內容。如要透過提供鍵/值組合建立 Kubernetes 密鑰,請執行下列指令。這個範例會建立名為
airflow-secrets
的 Secret,其中包含值為test_value
的sql_alchemy_conn
欄位。kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
如要透過提供檔案內容建立 Kubernetes 密鑰,請執行下列指令。這個範例會建立名為
service-account
的 Secret,其中的service-account.json
欄位會從本機./key.json
檔案的內容擷取值。kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
在 DAG 中使用 Kubernetes Secret
本範例說明使用 Kubernetes Secret 的兩種方式:做為環境變數,以及做為 Pod 掛載的磁碟區。
第一個密鑰 airflow-secrets
會設為名為 SQL_CONN
的 Kubernetes 環境變數 (而非 Airflow 或 Cloud Composer 環境變數)。
第二個密鑰 service-account
會將 service-account.json
掛載至 /var/secrets/google
,service-account.json
是含有服務帳戶權杖的檔案。
密鑰物件如下所示:
第一個 Kubernetes Secret 的名稱會在 secret_env
變數中定義。這個 Secret 的名稱為 airflow-secrets
。deploy_type
參數會指定該參數必須以環境變數形式公開。環境變數的名稱為 SQL_CONN
,如 deploy_target
參數所指定。最後,將 SQL_CONN
環境變數的值設為 sql_alchemy_conn
鍵的值。
在 secret_volume
變數中定義第二個 Kubernetes 密鑰的名稱。這個 Secret 的名稱為 service-account
。系統會將其公開為音量,如 deploy_type
參數所指定。要掛載的檔案路徑 deploy_target
為 /var/secrets/google
。最後,儲存在 deploy_target
中的 Secret 的 key
為 service-account.json
。
運算子設定如下所示:
CNCF Kubernetes 供應商相關資訊
KubernetesPodOperator 是在 apache-airflow-providers-cncf-kubernetes
供應器中實作。
如要查看 CNCF Kubernetes 供應商的詳細版本資訊,請前往 CNCF Kubernetes 供應商網站。
6.0.0 版
在 CNCF Kubernetes Provider 套件的 6.0.0 版中,KubernetesPodOperator 預設會使用 kubernetes_default
連線。
如果您在 5.0.0 版中指定自訂連線,運算子仍會使用這項自訂連線。如要改回使用 kubernetes_default
連線,建議您調整 DAG。
5.0.0 版
與 4.4.0 版本相比,這個版本引入了一些回溯不相容的變更。其中最重要的是與 kubernetes_default
連線相關,但 5.0.0 版並未使用這項連線。
kubernetes_default
連線需要修改。Kubernetes 設定路徑必須設為/home/airflow/composer_kube_config
(如下圖所示)。或者,您必須將config_file
新增至 KubernetesPodOperator 設定 (如以下程式碼範例所示)。

- 請按照下列方式修改使用 KubernetesPodOperator 的任務程式碼:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
如要進一步瞭解 5.0.0 版,請參閱 CNCF Kubernetes Provider Release Notes。
疑難排解
本節提供常見 KubernetesPodOperator 問題的疑難排解建議:
查看記錄
排解問題時,您可以按照以下順序查看記錄:
Airflow 工作記錄:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。
前往「DAG」分頁。
按一下 DAG 名稱,然後點選 DAG 執行作業,即可查看詳細資料和記錄。
Airflow 排程器記錄:
前往「環境詳細資料」頁面。
前往「Logs」分頁。
檢查 Airflow 排程器記錄。
Google Cloud 控制台的 GKE 工作負載下方有 Pod 記錄檔。這些記錄包括 Pod 定義 YAML 檔案、Pod 事件和 Pod 詳細資料。
非零值的傳回代碼
使用 KubernetesPodOperator (和 GKEStartPodOperator) 時,容器進入點的傳回代碼會決定工作是否視為成功。非零值的傳回代碼表示失敗。
常見的模式是執行殼層指令碼做為容器進入點,以便將容器內的多項作業分組。
如果您要編寫這類指令碼,建議您在指令碼頂端加入 set -e
指令,以便指令碼中的失敗指令終止指令碼,並將失敗情形傳播至 Airflow 工作例項。
Pod 逾時
KubernetesPodOperator 的預設逾時時間為 120 秒,可能會導致在下載較大圖片之前發生逾時。建立 KubernetesPodOperator 時,您可以變更 startup_timeout_seconds
參數來增加逾時時間。
Pod 逾時時,Airflow UI 會顯示專屬於該工作記錄。例如:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
如果 Cloud Composer 服務帳戶缺少執行當前任務所需的 IAM 權限,也會發生 Pod 逾時。如要確認這項資訊,請使用 GKE 資訊主頁查看 Pod 層級錯誤,以便查看特定工作負載的記錄檔,或使用 Cloud Logging。
無法建立新連線
GKE 叢集預設會啟用自動升級功能。如果節點集區位於升級中的叢集中,您可能會看到以下錯誤:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
如要確認叢集是否正在升級,請在 Google Cloud 控制台中前往「Kubernetes clusters」(Kubernetes 叢集) 頁面,然後查看環境叢集名稱旁邊的載入圖示。