Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何使用 KubernetesPodOperator,將 Cloud Composer 中的 Kubernetes Pods 部署至 Google Kubernetes Engine 叢集。
KubernetesPodOperator 會在環境叢集中啟動 Kubernetes Pods。相比之下,Google Kubernetes Engine 操作員會在指定叢集中執行 Kubernetes Pod,該叢集可以是與環境無關的獨立叢集。您也可以使用 Google Kubernetes Engine 運算子建立及刪除叢集。
如果您需要下列功能,KubernetesPodOperator 會是理想選擇:
- 無法透過公開 PyPI 存放區取得的自訂 Python 依附元件。
- 在原始 Cloud Composer 工作站映像檔中無法使用的二進位依附元件。
事前準備
- 建議您使用最新版本的 Cloud Composer。至少必須支援這個版本,做為淘汰和支援政策的一部分。
- 確認環境有足夠的資源。將 Pod 啟動至資源不足的環境,可能會導致 Airflow 工作站和 Airflow 排程器發生錯誤。
設定 Cloud Composer 環境資源
建立 Cloud Composer 環境時,您必須指定其效能參數,包括環境叢集的效能參數。將 Kubernetes Pod 啟動至環境叢集,可能會導致叢集資源 (例如 CPU 或記憶體) 競爭。由於 Airflow 排程器和工作站位於同一個 GKE 叢集,如果競爭導致資源耗盡,排程器和工作站就無法正常運作。
如要避免資源耗盡,請採取下列一或多項操作:
- (建議做法) 建立節點集區
- 增加環境中的節點數量
- 指定適當的機器類型
建立節點集區
如要避免 Cloud Composer 環境中的資源耗盡,建議您建立新的節點集區,並設定 Kubernetes Pod,只使用該集區的資源執行。
主控台
gcloud
判斷環境叢集的名稱:
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="value(config.gkeCluster)"
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。
- 將
輸出結果包含環境叢集的名稱。例如
europe-west3-example-enviro-af810e25-gke
。按照「新增節點集區」一文的指示建立節點集區。
增加環境中的節點數
增加 Cloud Composer 環境中的節點數量,可增加工作負載可用的運算能力。這項增加作業不會為需要比指定機器類型提供的 CPU 或 RAM 更多資源的任務提供額外資源。
如要增加節點數,請更新環境。
指定適當的機器類型
在 Cloud Composer 建立環境時,您可以指定機器類型。為確保資源可用,請針對 Cloud Composer 環境中發生的運算類型指定機器類型。
希望將設定需求減至最低
如要建立 KubernetesPodOperator,只需要 Pod 的 name
、要使用的 image
,以及 task_id
參數。/home/airflow/composer_kube_config
包含用於向 GKE 進行驗證的憑證。
Airflow 2
Airflow 1
Pod 相依性設定
在 KubernetesPodOperator 中設定 affinity
參數時,您可以控制要將 Pod 排程到哪些節點,例如只在特定節點集區中的節點。在這個範例中,運算子只會在名為 pool-0
和 pool-1
的節點集區上執行。Cloud Composer 1 環境節點位於 default-pool
中,因此 Pod 不會在環境中的節點上執行。
Airflow 2
Airflow 1
由於範例已設定,因此工作會失敗。查看記錄後,您會發現由於節點集區 pool-0
和 pool-1
不存在,因此任務會失敗。
如要確保 values
中的節點集區存在,請進行下列任一設定變更:
如果您先前已建立節點集區,請將
pool-0
和pool-1
替換為節點集區的名稱,然後再次上傳 DAG。建立名為
pool-0
或pool-1
的節點集區。您可以同時建立這兩個,但任務只需要其中一個即可成功。將
pool-0
和pool-1
替換為default-pool
,這是 Airflow 使用的預設集區。然後再次上傳 DAG。
變更完成後,請稍候幾分鐘,讓環境更新。然後再次執行 ex-pod-affinity
工作,並確認 ex-pod-affinity
工作是否成功。
額外設定
本例顯示可在 KubernetesPodOperator 中設定的其他參數。
詳情請參閱下列資源:
如要瞭解如何使用 Kubernetes 密鑰和 ConfigMap,請參閱「使用 Kubernetes 密鑰和 ConfigMap」。
如要進一步瞭解如何在 KubernetesPodOperator 中使用 Jinja 範本,請參閱「使用 Jinja 範本」。
如要瞭解 KubernetesPodOperator 參數,請參閱 Airflow 說明文件中的運算子參考資料。
Airflow 2
Airflow 1
使用 Jinja 範本
Airflow 支援 DAG 中的 Jinja 範本。
您必須使用運算子宣告必要的 Airflow 參數 (task_id
、name
和 image
)。如以下範例所示,您可以使用 Jinja 建立所有其他參數的範本,包括 cmds
、arguments
、env_vars
和 config_file
。
範例中的 env_vars
參數是從名為 my_value
的 Airflow 變數設定。範例 DAG 會從 Airflow 中的 vars
範本變數取得值。Airflow 提供更多變數,可存取不同類型的資訊。舉例來說,您可以使用 conf
範本變數存取 Airflow 設定選項的值。如需詳細資訊和 Airflow 中可用的變數清單,請參閱 Airflow 說明文件中的「範本參考資料」。
如果不變更 DAG 或建立 env_vars
變數,範例中的 ex-kube-templates
工作會失敗,因為變數不存在。您可以在 Airflow 使用者介面或 Google Cloud CLI 中建立這個變數:
Airflow UI
前往 Airflow UI。
在工具列中,依序選取「管理」>「變數」。
在「清單變數」頁面中,按一下「新增記錄」。
在「新增變數」頁面中輸入下列資訊:
- 鍵:
my_value
- Val:
example_value
- 鍵:
按一下 [儲存]。
如果您的環境使用 Airflow 1,請改為執行下列指令:
前往 Airflow UI。
在工具列中,依序選取「管理」>「變數」。
在「變數」頁面中,按一下「建立」分頁。
在「變數」頁面中輸入下列資訊:
- 鍵:
my_value
- Val:
example_value
- 鍵:
按一下 [儲存]。
gcloud
輸入下列指令:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
如果您的環境使用 Airflow 1,請改為執行下列指令:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables -- \
--set my_value example_value
取代:
- 將
ENVIRONMENT
替換為環境的名稱。 LOCATION
改成環境所在的地區。
以下範例說明如何搭配 KubernetesPodOperator 使用 Jinja 範本:
Airflow 2
Airflow 1
使用 Kubernetes Secret 和 ConfigMap
Kubernetes Secret 是包含機密資料的物件。Kubernetes ConfigMap 是物件,可在鍵/值組合中包含非機密資料。
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI、API 或 Terraform 建立機密資料和 ConfigMap,然後透過 KubernetesPodOperator 存取。
關於 YAML 設定檔
使用 Google Cloud CLI 和 API 建立 Kubernetes 密鑰或 ConfigMap 時,您必須提供 YAML 格式的檔案。這個檔案必須遵循 Kubernetes 密鑰和 ConfigMap 使用的格式。Kubernetes 說明文件提供許多 ConfigMap 和 Secret 的程式碼範例。如要開始使用,請參閱「使用 Secret 安全地發布憑證」頁面和 ConfigMaps。
與 Kubernetes 密鑰相同,在密鑰中定義值時,請使用 Base64 表示法。
如要編碼值,您可以使用下列指令 (這是取得 Base64 編碼值的眾多方法之一):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
輸出:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
以下兩個 YAML 檔案範例會在本指南後續的範例中使用。Kubernetes 機密資料的 YAML 設定檔範例:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
以下是另一個示例,說明如何納入檔案。與前一個範例相同,請先對檔案內容進行編碼 (cat ./key.json | base64
),然後在 YAML 檔案中提供這個值:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
ConfigMap 的 YAML 設定檔範例。您不需要在 ConfigMap 中使用 base64 表示法:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
管理 Kubernetes 密鑰
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI 和 kubectl
建立機密:
取得環境叢集的相關資訊:
執行下列指令:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
取代:
ENVIRONMENT
改為環境名稱。LOCATION
與 Cloud Composer 環境所在的地區。
這個指令的輸出格式如下:
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
。如要取得 GKE 叢集 ID,請複製
/clusters/
後方的輸出內容 (結尾為-gke
)。如要取得區域,請複製
/zones/
後面的輸出內容。
使用下列指令連線至 GKE 叢集:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --zone ZONE
取代:
CLUSTER_ID
:環境的叢集 ID。PROJECT_ID
:專案 ID。ZONE
替換成環境叢集所在的區域。
建立 Kubernetes 密鑰:
以下指令示範兩種建立 Kubernetes 機密值的方法。
--from-literal
方法會使用鍵/值組合。--from-file
方法會使用檔案內容。如要透過提供鍵/值組合建立 Kubernetes 密鑰,請執行下列指令。這個範例會建立名為
airflow-secrets
的 Secret,其中包含值為test_value
的sql_alchemy_conn
欄位。kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
如要透過提供檔案內容建立 Kubernetes 密鑰,請執行下列指令。這個範例會建立名為
service-account
的 Secret,其中的service-account.json
欄位會從本機./key.json
檔案的內容擷取值。kubectl create secret generic service-account \ --from-file service-account.json=./key.json
在 DAG 中使用 Kubernetes Secret
本範例說明使用 Kubernetes Secret 的兩種方式:做為環境變數,以及做為 Pod 掛載的磁碟區。
第一個密鑰 airflow-secrets
會設為名為 SQL_CONN
的 Kubernetes 環境變數 (而非 Airflow 或 Cloud Composer 環境變數)。
第二個密鑰 service-account
會將 service-account.json
掛載至 /var/secrets/google
,service-account.json
是含有服務帳戶權杖的檔案。
密鑰物件如下所示:
Airflow 2
Airflow 1
第一個 Kubernetes Secret 的名稱會在 secret_env
變數中定義。這個 Secret 的名稱為 airflow-secrets
。deploy_type
參數會指定該參數必須以環境變數形式公開。環境變數的名稱為 SQL_CONN
,如 deploy_target
參數所指定。最後,將 SQL_CONN
環境變數的值設為 sql_alchemy_conn
鍵的值。
在 secret_volume
變數中定義第二個 Kubernetes 密鑰的名稱。這個 Secret 的名稱為 service-account
。系統會將其公開為音量,如 deploy_type
參數所指定。要掛載的檔案路徑 deploy_target
為 /var/secrets/google
。最後,儲存在 deploy_target
中的 Secret 的 key
為 service-account.json
。
運算子設定如下所示:
Airflow 2
Airflow 1
CNCF Kubernetes 供應商相關資訊
KubernetesPodOperator 是在 apache-airflow-providers-cncf-kubernetes
供應器中實作。
如要查看 CNCF Kubernetes 供應商的詳細版本資訊,請前往 CNCF Kubernetes 供應商網站。
6.0.0 版
在 CNCF Kubernetes Provider 套件的 6.0.0 版中,KubernetesPodOperator 預設會使用 kubernetes_default
連線。
如果您在 5.0.0 版中指定自訂連線,運算子仍會使用這項自訂連線。如要改回使用 kubernetes_default
連線,建議您調整 DAG。
5.0.0 版
與 4.4.0 版本相比,這個版本引入了一些回溯不相容的變更。其中最重要的是與 kubernetes_default
連線相關,但 5.0.0 版並未使用這項連線。
kubernetes_default
連線需要修改。Kubernetes 設定路徑必須設為/home/airflow/composer_kube_config
(如下圖所示)。或者,您必須將config_file
新增至 KubernetesPodOperator 設定 (如以下程式碼範例所示)。

- 請按照下列方式修改使用 KubernetesPodOperator 的任務程式碼:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
如要進一步瞭解 5.0.0 版,請參閱 CNCF Kubernetes Provider Release Notes。
疑難排解
本節提供常見 KubernetesPodOperator 問題的疑難排解建議:
查看記錄
排解問題時,您可以按照以下順序查看記錄:
Airflow 工作記錄:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。
前往「DAG」分頁。
按一下 DAG 名稱,然後點選 DAG 執行作業,即可查看詳細資料和記錄。
Airflow 排程器記錄:
前往「環境詳細資料」頁面。
前往「Logs」分頁。
檢查 Airflow 排程器記錄。
Google Cloud 控制台的 GKE 工作負載下方有 Pod 記錄檔。這些記錄包括 Pod 定義 YAML 檔案、Pod 事件和 Pod 詳細資料。
非零值的傳回代碼
使用 KubernetesPodOperator (和 GKEStartPodOperator) 時,容器進入點的傳回代碼會決定工作是否視為成功。非零值的傳回代碼表示失敗。
常見的模式是執行殼層指令碼做為容器進入點,以便將容器內的多項作業分組。
如果您要編寫這類指令碼,建議您在指令碼頂端加入 set -e
指令,以便指令碼中的失敗指令終止指令碼,並將失敗情形傳播至 Airflow 工作例項。
Pod 逾時
KubernetesPodOperator 的預設逾時時間為 120 秒,可能會導致在下載較大圖片之前發生逾時。建立 KubernetesPodOperator 時,您可以變更 startup_timeout_seconds
參數來增加逾時時間。
Pod 逾時時,Airflow UI 會顯示專屬於該工作記錄。例如:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
如果 Cloud Composer 服務帳戶缺少執行當前任務所需的 IAM 權限,也會發生 Pod 逾時。如要確認這項資訊,請使用 GKE 資訊主頁查看 Pod 層級錯誤,以便查看特定工作負載的記錄檔,或使用 Cloud Logging。
無法建立新連線
GKE 叢集預設會啟用自動升級功能。如果節點集區位於升級中的叢集中,您可能會看到以下錯誤:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
如要確認叢集是否正在升級,請在 Google Cloud 控制台中前往「Kubernetes clusters」(Kubernetes 叢集) 頁面,然後查看環境叢集名稱旁邊的載入圖示。