KubernetesPodOperator 사용

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 KubernetesPodOperator를 사용하여 Cloud Composer에서 Cloud Composer 환경의 일부인 Google Kubernetes Engine 클러스터로 Kubernetes 포드를 배포하는 방법을 설명합니다.

KubernetesPodOperator는 사용자 환경의 클러스터에서 Kubernetes 포드를 실행합니다. 이에 비해 Google Kubernetes Engine 연산자는 지정된 클러스터에서 Kubernetes 포드를 실행합니다. 이 클러스터는 환경과 관련이 없는 별도의 클러스터일 수 있습니다. Google Kubernetes Engine 연산자를 사용하여 클러스터를 만들고 삭제할 수도 있습니다.

KubernetesPodOperator는 다음이 필요한 경우에 적합한 옵션입니다.

  • 공개 PyPI 저장소를 통해 사용할 수 없는 커스텀 Python 종속 항목
  • 스톡 Cloud Composer 작업자 이미지에서 사용할 수 없는 바이너리 종속 항목

시작하기 전에

Cloud Composer 3과 Cloud Composer 2 간에 KubernetesPodOperator의 차이점 목록을 확인하고 DAG가 호환되는지 확인합니다.

  • Cloud Composer 3에서는 커스텀 네임스페이스를 만들 수 없습니다. 포드는 다른 네임스페이스가 지정된 경우에도 항상 composer-user-workloads 네임스페이스에서 실행됩니다. 이 네임스페이스에서 포드는 추가 구성 없이 프로젝트의 리소스와 VPC 네트워크(사용 설정된 경우)에 액세스할 수 있습니다.

  • Kubernetes API를 사용하여 Kubernetes 보안 비밀 및 ConfigMap을 만들 수 없습니다. 대신 Cloud Composer는 Kubernetes 보안 비밀 및 ConfigMap을 관리하기 위해 Google Cloud CLI 명령어, Terraform 리소스, Cloud Composer API를 제공합니다. 자세한 내용은 Kubernetes 보안 비밀 및 ConfigMap 사용을 참조하세요.

  • Cloud Composer 2에서와 마찬가지로 포드 어피니티 구성은 사용할 수 없습니다. 포드 어피니티를 사용하려면 GKE 연산자를 사용하여 다른 클러스터에서 포드를 실행합니다.

Cloud Composer 3의 KubernetesPodOperator 정보

이 섹션에서는 Cloud Composer 3에서 KubernetesPodOperator가 작동하는 방식을 설명합니다.

리소스 사용량

Cloud Composer 3에서 환경 클러스터는 자동으로 확장됩니다. KubernetesPodOperator를 사용하여 실행하는 추가 워크로드는 사용자 환경과 독립적으로 확장됩니다. 사용자 환경이 증가한 리소스 수요의 영향을 받지 않지만 사용자 환경의 클러스터는 리소스 수요에 따라 확장 및 축소됩니다.

사용자 환경의 클러스터에서 실행되는 추가 워크로드의 가격 책정은 Cloud Composer 3 가격 책정 모델을 따르고 Cloud Composer SKU 3개를 사용합니다.

Cloud Composer 3은 컴퓨팅 클래스의 개념을 도입하는 Autopilot 클러스터를 사용합니다.

  • Cloud Composer는 general-purpose 컴퓨팅 클래스만 지원합니다.

  • 기본적으로 클래스를 선택하지 않으면 KubernetesPodOperator를 사용하여 포드를 만들 때 general-purpose 클래스로 간주됩니다.

  • 각 클래스는 특정 속성 및 리소스 한도와 연결됩니다. 자세한 내용은 Autopilot 문서를 참조하세요. 예를 들어 general-purpose 클래스 내에서 실행되는 포드는 최대 110GiB 메모리를 사용할 수 있습니다.

프로젝트 리소스에 대한 액세스 권한

Cloud Composer 3에서 사용자 환경의 클러스터는 테넌트 프로젝트에 있으며, 포드는 격리된 네임스페이스에서 환경의 클러스터에서 실행됩니다.

Cloud Composer 3에서 포드는 다른 네임스페이스가 지정된 경우에도 항상 composer-user-workloads 네임스페이스에서 실행됩니다. 이 네임스페이스의 포드는 추가 구성 없이 프로젝트와 VPC 네트워크(사용 설정된 경우)의 Google Cloud 리소스에 액세스할 수 있습니다. 사용자 환경의 서비스 계정은 이러한 리소스에 액세스하는 데 사용됩니다. 다른 서비스 계정을 지정할 수는 없습니다.

최소 구성

KubernetesPodOperator를 만들려면 포드의 name, 사용할 image, task_id 매개변수만 필요합니다. /home/airflow/composer_kube_config에는 GKE에 인증할 사용자 인증 정보가 포함되어 있습니다.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

추가 구성

이 예시에서는 KubernetesPodOperator에서 구성할 수 있는 추가 매개변수를 보여줍니다.

매개변수에 관한 자세한 내용은 KubernetesPodOperator의 Airflow 참조를 확인하세요. Kubernetes 보안 비밀 및 ConfigMap 사용에 관한 자세한 내용은 Kubernetes 보안 비밀 및 ConfigMap 사용을 참조하세요. KubernetesPodOperator와 함께 Jinja 템플릿 사용에 관한 자세한 내용은 Jinja 템플릿 사용을 참조하세요.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Jinja 템플릿 사용

Airflow는 DAG에서 Jinja 템플릿을 지원합니다.

연산자와 함께 필수 Airflow 매개변수(task_id, name, image)를 선언해야 합니다. 다음 예시와 같이 Jinja를 사용하여 다른 모든 매개변수(cmds, arguments, env_vars, config_file)를 템플릿으로 만들 수 있습니다.

예시의 env_vars 매개변수는 my_value라는 Airflow 변수에서 설정됩니다. 예시 DAG는 Airflow의 vars 템플릿 변수에서 값을 가져옵니다. Airflow에는 다양한 유형의 정보에 액세스할 수 있는 더 많은 변수가 있습니다. 예를 들어 conf 템플릿 변수를 사용하여 Airflow 구성 옵션의 값에 액세스할 수 있습니다. 자세한 내용과 Airflow에서 사용할 수 있는 변수 목록은 Airflow 문서의 템플릿 참조를 확인하세요.

DAG를 변경하거나 env_vars 변수를 만들지 않으면 변수가 없으므로 예시의 ex-kube-templates 태스크가 실패합니다. Airflow UI 또는 Google Cloud CLI에서 이 변수를 만듭니다.

  1. Airflow UI로 이동합니다.

  2. 툴바에서 관리 > 변수를 선택합니다.

  3. 변수 나열 페이지에서 새 레코드 추가를 클릭합니다.

  4. 변수 추가 페이지에서 다음 정보를 입력합니다.

    • 키: my_value
    • Val: example_value
  5. 저장을 클릭합니다.

다음 명령어를 입력합니다.

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

다음과 같이 바꿉니다.

  • ENVIRONMENT: 환경 이름
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

다음 예시는 KubernetesPodOperator와 함께 Jinja 템플릿을 사용하는 방법을 보여줍니다.

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes 보안 비밀 및 ConfigMap 사용

Kubernetes 보안 비밀은 민감한 정보를 포함하는 객체입니다. Kubernetes ConfigMap은 키-값 쌍에 비기밀 데이터를 포함하는 객체입니다.

Cloud Composer 3에서는 Google Cloud CLI, API 또는 Terraform을 사용하여 보안 비밀 및 ConfigMap을 만든 다음 KubernetesPodOperator에서 액세스할 수 있습니다.

  • Google Cloud CLI 및 API를 사용하여 YAML 구성 파일을 제공합니다.
  • Terraform을 사용하여 Terraform 구성 파일에서 보안 비밀과 ConfigMap을 별도의 리소스로 정의합니다.

YAML 구성 파일 정보

Google Cloud CLI 및 API를 사용하여 Kubernetes 보안 비밀 또는 ConfigMap을 만들 때 YAML 형식의 파일을 제공합니다. 이 파일은 Kubernetes 보안 비밀 및 ConfigMap에서 사용하는 것과 동일한 형식을 따라야 합니다. Kubernetes 문서에서는 ConfigMap 및 보안 비밀의 여러 코드 샘플을 제공합니다. 시작하려면 보안 비밀을 사용하여 사용자 인증 정보를 안전하게 배포 페이지와 ConfigMaps을 참조하세요.

Kubernetes 보안 비밀과 동일하게 보안 비밀에서 값을 정의할 때는 base64 표현을 사용합니다.

값을 인코딩하려면 다음 명령어를 사용할 수 있습니다. 이는 base64로 인코딩된 값을 가져오는 여러 가지 방법 중 하나입니다.

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

출력:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

이 가이드 뒷부분의 샘플에는 다음 두 개의 YAML 파일 예시가 사용됩니다. Kubernetes 보안 비밀의 YAML 구성 파일 예시:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

파일을 포함하는 방법을 보여주는 또 다른 예시입니다. 이전 예시와 마찬가지로 먼저 파일(cat ./key.json | base64)의 콘텐츠를 인코딩한 다음 YAML 파일에 이 값을 제공합니다.

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap의 YAML 구성 파일 예시. ConfigMap에서는 base64 표현을 사용할 필요가 없습니다.

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Kubernetes 보안 비밀 관리

보안 비밀 만들기

Kubernetes 보안 비밀을 만들려면 다음 명령어를 실행합니다.

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전
  • SECRET_FILE: 보안 비밀의 구성이 포함된 로컬 YAML 파일의 경로입니다.

예를 들면 다음과 같습니다.

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

보안 비밀 업데이트

Kubernetes 보안 비밀을 업데이트하려면 다음 명령어를 실행합니다. 보안 비밀의 이름은 지정된 YAML 파일에서 가져오고 보안 비밀의 콘텐츠는 바뀝니다.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전
  • SECRET_FILE: 보안 비밀의 구성이 포함된 로컬 YAML 파일의 경로입니다. 이 파일의 metadata > name 필드에 보안 비밀의 이름을 지정합니다.

보안 비밀 나열

환경의 보안 비밀 및 필드 목록을 가져오려면 다음 명령어를 실행합니다. 출력에서 키 값은 별표로 바뀝니다.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전

보안 비밀 세부정보 가져오기

보안 비밀에 대한 자세한 정보를 확인하려면 다음 명령어를 실행합니다. 출력에서 키 값은 별표로 바뀝니다.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

다음을 바꿉니다.

  • SECRET_NAME: YAML 파일의 metadata > name 필드에 보안 비밀의 구성과 함께 정의된 보안 비밀의 이름입니다.
  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전

보안 비밀 삭제

보안 비밀을 삭제하려면 다음 명령어를 실행합니다.

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: YAML 파일의 metadata > name 필드에 보안 비밀의 구성과 함께 정의된 보안 비밀의 이름입니다.
  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전입니다.

보안 비밀 만들기

  1. environments.userWorkloadsSecrets.create API 요청을 만듭니다.

  2. 이 요청의 작성 방법:

    1. 요청 본문의 name 필드에 새 보안 비밀의 URI를 지정합니다.
    2. 요청 본문의 data 필드에 보안 비밀의 키와 base64로 인코딩된 값을 지정합니다.

예를 들면 다음과 같습니다.

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

보안 비밀 업데이트

  1. environments.userWorkloadsSecrets.update API 요청을 만듭니다.

  2. 이 요청의 작성 방법:

    1. 요청 본문의 name 필드에 보안 비밀의 URI를 지정합니다.
    2. 요청 본문의 data 필드에 보안 비밀의 키와 base64로 인코딩된 값을 지정합니다. 값이 바뀝니다.

예를 들면 다음과 같습니다.

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

보안 비밀 나열

environments.userWorkloadsSecrets.list API 요청을 만듭니다. 출력의 키 값은 별표로 바뀝니다. 이 요청에서 페이지로 나누기를 사용할 수 있습니다. 자세한 내용은 요청 참조를 확인하세요.

예를 들면 다음과 같습니다.

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

보안 비밀 세부정보 가져오기

environments.userWorkloadsSecrets.get API 요청을 만듭니다. 출력의 키 값은 별표로 바뀝니다.

예를 들면 다음과 같습니다.

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

보안 비밀 삭제

environments.userWorkloadsSecrets.delete API 요청을 만듭니다.

예를 들면 다음과 같습니다.

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

google_composer_user_workloads_secret 리소스는 data 블록에 정의된 키와 값이 있는 Kubernetes 보안 비밀을 정의합니다.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Terraform에 환경의 정의를 포함하는 환경 리소스의 이름입니다. 실제 환경의 이름도 이 리소스에 지정됩니다.
  • LOCATION: 환경이 위치한 리전
  • SECRET_NAME: 보안 비밀의 이름입니다.
  • KEY_NAME: 이 보안 비밀의 하나 이상의 키입니다.
  • KEY_VALUE: 키의 base64로 인코딩된 값입니다. base64encode 함수를 사용하여 값을 인코딩할 수 있습니다(예시 참조).

다음 두 가지 Kubernetes 보안 비밀 예시는 이 가이드의 뒷부분에 나오는 샘플에서 사용됩니다.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

파일을 포함하는 방법을 보여주는 또 다른 예시입니다. file 함수를 사용하여 파일의 콘텐츠를 문자열로 읽은 다음 base64로 인코딩할 수 있습니다.

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

DAG에서 Kubernetes 보안 비밀 사용

이 예시에서는 Kubernetes 보안 비밀을 환경 변수와 포드가 마운트한 볼륨으로 사용하는 두 가지 방법을 보여줍니다.

Airflow 또는 Cloud Composer 환경 변수와 반대로 첫 번째 보안 비밀 airflow-secretsSQL_CONN이라는 Kubernetes 환경 변수로 설정됩니다.

두 번째 보안 비밀 service-account는 서비스 계정 토큰이 있는 파일인 service-account.json/var/secrets/google에 마운트합니다.

보안 비밀 객체는 다음과 같습니다.

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

첫 번째 Kubernetes 보안 비밀의 이름은 secret_env 변수에 정의됩니다. 이 보안 비밀의 이름은 airflow-secrets입니다. deploy_type 매개변수에서는 이 보안 비밀이 환경 변수로 노출되어야 한다는 것을 지정합니다. 환경 변수의 이름은 deploy_target 매개변수에 지정된 대로 SQL_CONN입니다. 마지막으로 SQL_CONN 환경 변수의 값이 sql_alchemy_conn 키의 값으로 설정됩니다.

두 번째 Kubernetes 보안 비밀의 이름은 secret_volume 변수에 정의됩니다. 이 보안 비밀의 이름은 service-account입니다. 이 보안 비밀은 deploy_type 매개변수에 지정된 대로 볼륨으로 노출됩니다. 마운트할 파일 deploy_target의 경로는 /var/secrets/google입니다. 마지막으로 deploy_target에 저장된 보안 비밀의 keyservice-account.json입니다.

연산자 구성은 다음과 같습니다.

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes ConfigMap 관리

ConfigMap 만들기

ConfigMap을 만들려면 다음 명령어를 실행합니다.

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전
  • CONFIG_MAP_FILE: ConfigMap의 구성이 포함된 로컬 YAML 파일의 경로입니다.

예를 들면 다음과 같습니다.

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

ConfigMap 업데이트

ConfigMap을 업데이트하려면 다음 명령어를 실행합니다. ConfigMap의 이름은 지정된 YAML 파일에서 가져오고 ConfigMap의 콘텐츠는 바뀝니다.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전
  • CONFIG_MAP_FILE: ConfigMap의 구성이 포함된 로컬 YAML 파일의 경로입니다. 이 파일의 metadata > name 필드에 ConfigMap의 이름을 지정합니다.

ConfigMap 나열

환경의 ConfigMap 및 필드 목록을 가져오려면 다음 명령어를 실행합니다. 출력에서 키 값은 그대로 표시됩니다.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전

ConfigMap 세부정보 가져오기

ConfigMap에 대한 자세한 정보를 확인하려면 다음 명령어를 실행합니다. 출력에서 키 값은 그대로 표시됩니다.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

다음을 바꿉니다.

  • CONFIG_MAP_NAME: YAML 파일의 metadata > name 필드에 ConfigMap의 구성과 함께 정의된 ConfigMap의 이름입니다.
  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전

ConfigMap 삭제

ConfigMap을 삭제하려면 다음 명령어를 실행합니다.

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: YAML 파일의 metadata > name 필드에 ConfigMap의 구성과 함께 정의된 ConfigMap의 이름입니다.
  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전입니다.

ConfigMap 만들기

  1. environments.userWorkloadsConfigMaps.create API 요청을 만듭니다.

  2. 이 요청의 작성 방법:

    1. 요청 본문의 name 필드에 새 ConfigMap의 URI를 지정합니다.
    2. 요청 본문의 data 필드에 ConfigMap의 키와 값을 지정합니다.

예를 들면 다음과 같습니다.

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

ConfigMap 업데이트

  1. environments.userWorkloadsConfigMaps.update API 요청을 만듭니다.

  2. 이 요청의 작성 방법:

    1. 요청 본문의 name 필드에 ConfigMap의 URI를 지정합니다.
    2. 요청 본문의 data 필드에 ConfigMap의 키와 값을 지정합니다. 값이 바뀝니다.

예를 들면 다음과 같습니다.

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

ConfigMap 나열

environments.userWorkloadsConfigMaps.list API 요청을 만듭니다. 출력에서 키 값은 있는 그대로 표시됩니다. 이 요청에서 페이지로 나누기를 사용할 수 있습니다. 자세한 내용은 요청 참조를 확인하세요.

예를 들면 다음과 같습니다.

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

ConfigMap 세부정보 가져오기

environments.userWorkloadsConfigMaps.get API 요청을 만듭니다. 출력에서 키 값은 그대로 표시됩니다.

예를 들면 다음과 같습니다.

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

ConfigMap 삭제

environments.userWorkloadsConfigMaps.delete API 요청을 만듭니다.

예를 들면 다음과 같습니다.

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

google_composer_user_workloads_config_map 리소스는 data 블록에 정의된 키와 값이 있는 ConfigMap을 정의합니다.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Terraform에 환경의 정의를 포함하는 환경 리소스의 이름입니다. 실제 환경의 이름도 이 리소스에 지정됩니다.
  • LOCATION: 환경이 위치한 리전
  • CONFIG_MAP_NAME: ConfigMap의 이름입니다.
  • KEY_NAME: 이 ConfigMap의 하나 이상의 키입니다.
  • KEY_VALUE: 키의 값입니다.

예를 들면 다음과 같습니다.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

DAG에서 ConfigMap 사용

이 예시는 DAG에서 ConfigMap을 사용하는 방법을 보여줍니다.

다음 예시에서 ConfigMap은 configmaps 매개변수로 전달됩니다. 이 ConfigMap의 모든 키는 환경 변수로 사용할 수 있습니다.

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

다음 예시는 ConfigMap을 볼륨으로 마운트하는 방법을 보여줍니다.

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

CNCF Kubernetes Provider 정보

KubernetesPodOperator는 apache-airflow-providers-cncf-kubernetes Provider에 구현됩니다.

CNCF Kubernetes Provider의 자세한 출시 노트는 CNCF Kubernetes Provider 웹사이트를 참조하세요.

문제 해결

이 섹션에서는 일반적인 KubernetesPodOperator 문제를 해결하기 위한 조언을 제공합니다.

로그 보기

문제를 해결할 때는 다음 순서로 로그를 확인할 수 있습니다.

  1. Airflow 태스크 로그:

    1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

      환경으로 이동

    2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

    3. DAG 탭으로 이동합니다.

    4. DAG 이름을 클릭한 다음 DAG 실행을 클릭하여 세부정보와 로그를 확인합니다.

  2. Airflow 스케줄러 로그:

    1. 환경 세부정보 페이지로 이동합니다.

    2. 로그 탭으로 이동합니다.

    3. Airflow 스케줄러 로그를 검사합니다.

  3. 사용자 워크로드 로그:

    1. 환경 세부정보 페이지로 이동합니다.

    2. Monitoring 탭으로 이동합니다.

    3. 사용자 워크로드를 선택합니다.

    4. 실행된 워크로드 목록을 검사합니다. 각 워크로드의 로그와 리소스 사용률 정보를 볼 수 있습니다.

0이 아닌 반환 코드

KubernetesPodOperator(및 GKEStartPodOperator)를 사용할 때 컨테이너 진입점의 반환 코드를 통해 작업의 성공 여부를 알 수 있습니다. 0이 아닌 반환 코드는 실패를 나타냅니다.

일반적인 패턴은 셸 스크립트를 컨테이너 진입점으로 실행하여 컨테이너 내에서 여러 작업을 함께 그룹화하는 것입니다.

이러한 스크립트를 작성 중인 경우 스크립트 상단에 set -e 명령어를 사용하여 스크립트의 실패한 명령어가 스크립트를 종료하고 실패를 Airflow 작업 인스턴스에 적용하도록 합니다.

포드 제한 시간

KubernetesPodOperator의 기본 제한 시간은 120초이므로 큰 이미지를 다운로드하기 전에 시간 초과가 발생할 수 있습니다. 제한 시간을 변경하려면 KubernetesPodOperator 생성 시 startup_timeout_seconds 매개변수를 변경합니다.

포드가 시간 초과되면 Airflow UI에서 작업별 로그를 사용할 수 있습니다. 예를 들면 다음과 같습니다.

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Cloud Composer 서비스 계정에 작업을 수행하는 데 필요한 IAM 권한이 없는 경우에도 포드 시간 초과가 발생할 수 있습니다. 이를 확인하려면 GKE 대시보드를 사용하여 포드 수준 오류를 보고 특정 워크로드의 로그를 확인하거나 Cloud Logging을 사용하세요.

다음 단계