Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、既存の Cloud Composer 1 の DAG、データ、構成の Airflow 2 環境を、Cloud Composer 2(Airflow 2)に移行する方法について説明します。
その他の移行ガイド
移行元 | 移行先 | メソッド | ガイド |
---|---|---|---|
Cloud Composer 1、Airflow 2 | Cloud Composer 2、Airflow 2 | 並列処理、スナップショットを使用 | 移行ガイド(スナップショット) |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 並列処理、スナップショットを使用 | 移行ガイド(スナップショット) |
Cloud Composer 1、Airflow 2 | Cloud Composer 2、Airflow 2 | 並列処理、手動転送 | このガイド(手動移行) |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 並列処理、手動転送 | 手動移行ガイド |
Airflow 1 | Airflow 2 | 並列処理、手動転送 | 手動移行ガイド |
始める前に
- Cloud Composer では、Cloud Composer 1 から Cloud Composer 2 への移し換え移行がサポートされています。Cloud Composer 1 から Cloud Composer 2 への置き換えアップグレードはできません。
- Cloud Composer 1 と Cloud Composer 2 の相違点の一覧を確認してください。
ステップ 1: 構成のオーバーライド、カスタム PyPI パッケージ、環境変数のリストを取得する
コンソール
Cloud Composer 1 環境の、構成のオーバーライド、カスタム PyPI パッケージ、環境変数のリストを取得します。
Google Cloud Console の [環境] ページに移動します。
Cloud Composer 1 環境を選択します。
[環境変数] タブで、環境変数を表示します。
[Airflow 構成のオーバーライド] タブで、構成のオーバーライドを表示します。
[PyPI パッケージ] タブで、カスタム PyPI パッケージを表示します。
gcloud
環境変数のリストを取得するには、次のコマンドを実行します。
gcloud composer environments describe \
COMPOSER_1_ENV \
--location COMPOSER_1_LOCATION \
--format="value(config.softwareConfig.envVariables)"
環境の Airflow 構成のオーバーライドのリストを取得するには、次のコマンドを実行します。
gcloud composer environments describe \
COMPOSER_1_ENV \
--location COMPOSER_1_LOCATION \
--format="value(config.softwareConfig.airflowConfigOverrides)"
カスタム PyPI パッケージのリストを取得するには、次のコマンドを実行します。
gcloud composer environments describe \
COMPOSER_1_ENV \
--location COMPOSER_1_LOCATION \
--format="value(config.softwareConfig.pypiPackages)"
以下のように置き換えます。
COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。
Terraform
このステップをスキップします。Cloud Composer 1 環境の構成には、環境の構成のオーバーライド、カスタム PyPI パッケージ、環境変数がすでに記載されています。
ステップ 2: Cloud Composer 2 環境を作成する
このステップでは、Cloud Composer 2 環境を作成します。想定されるリソース需要を満たす環境プリセットから始めて、後で環境をさらにスケーリングして最適化できます。
コンソール
gcloud
Terraform
Cloud Composer 2 環境は、Cloud Composer 1 環境の構成に基づいて作成します。
- Cloud Composer 1 環境の構成をコピーします。
- 環境の名前を変更します。
google-beta
プロバイダを使用します。resource "google_composer_environment" "example_environment_composer_2" { provider = google-beta # ... }
config.software_config
ブロックに、Cloud Composer 2 イメージを指定します。software_config { image_version = "composer-2.9.7-airflow-2.9.3" # ... }
まだ行っていない場合は、構成のオーバーライドと環境変数を指定します。
config.software_config.pypi_packages
ブロックに、カスタム PyPI パッケージを指定します。software_config { # ... pypi_packages = { numpy = "" scipy = ">=1.1.0" } }
ステップ 3: PyPI パッケージを Cloud Composer 2 環境にインストールする
Cloud Composer 2 環境を作成したら、その環境にカスタム PyPI パッケージをインストールします。
コンソール
Google Cloud Console の [環境] ページに移動します。
Cloud Composer 2 環境を選択します。
[PyPI パッケージ] タブに移動し、[編集] をクリックします。
Cloud Composer 1 環境から PyPI パッケージに必要なものをコピーします。[保存] をクリックして、環境が更新されるまで待ちます。
gcloud
カスタム PyPI パッケージのリストを含む
requirements.txt
ファイルを作成します。numpy scipy>=1.1.0
環境を更新します。また、
requirements.txt
ファイルは、--update-pypi-packages-from-file
コマンドに渡します。gcloud composer environments update COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ --update-pypi-packages-from-file requirements.txt
以下のように置き換えます。
COMPOSER_2_ENV
は、Cloud Composer 2 環境の名前。COMPOSER_2_LOCATION
は、Cloud Composer 2 環境が配置されているリージョン。
Terraform
このステップをスキップします。カスタム PyPI パッケージは、環境の作成時にすでにインストールしています。
ステップ 4: 変数とプールを移行する
Airflow では、変数とプールを JSON ファイルにエクスポートできます。そうすると、これらのファイルを Cloud Composer 2 環境にインポートできます。
このステップで使用する Airflow CLI コマンドは、Airflow ワーカーのローカル ファイルに対して動作します。ファイルをアップロードまたはダウンロードするには、使用している環境の Cloud Storage バケットの /data
フォルダを使用します。このフォルダは Airflow ワーカーの /home/airflow/gcs/data/
ディレクトリに同期されます。Airflow CLI コマンドで、FILEPATH
パラメータに /home/airflow/gcs/data/
を指定します。
gcloud
Cloud Composer 1 環境の変数をエクスポートします。
gcloud composer environments run \ COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ variables export -- /home/airflow/gcs/data/variables.json
以下のように置き換えます。
COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。
Cloud Composer 1 環境のプールをエクスポートします。
gcloud composer environments run \ COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ pools export -- /home/airflow/gcs/data/pools.json
以下のように置き換えます。
COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。
Cloud Composer 2 環境のバケット URI を取得します。
次のコマンドを実行します。
gcloud composer environments describe COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ --format="value(config.dagGcsPrefix)"
以下のように置き換えます。
COMPOSER_2_ENV
は、Cloud Composer 2 環境の名前。COMPOSER_2_LOCATION
は、環境が配置されているリージョン。
出力で、
/dags
フォルダを削除します。結果は、Cloud Composer 2 環境のバケットの URI です。たとえば、
gs://us-central1-example-916807e1-bucket/dags
をgs://us-central1-example-916807e1-bucket
に変更します。
変数とプールを含む JSON ファイルを、Cloud Composer 2 環境に移行します。
gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION \ --source=variables.json
gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION \ --source=pools.json
以下のように置き換えます。
COMPOSER_2_BUCKET
は、前の手順で取得した Cloud Composer 2 環境バケットの URI。COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。
変数とプールを Cloud Composer 2 にインポートします。
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ variables import \ -- /home/airflow/gcs/data/variables.json
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ pools import \ -- /home/airflow/gcs/data/pools.json
変数とプールがインポートされたことを確認します。
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ variables list
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ pools list
バケットから JSON ファイルを削除します。
gcloud composer environments storage data delete \ variables.json \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
gcloud composer environments storage data delete \ pools.json \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
gcloud composer environments storage data delete \ variables.json \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
gcloud composer environments storage data delete \ pools.json \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
ステップ 5: Cloud Composer 1 環境のバケットから他のデータを移行する
Cloud Composer 1 環境のバケットからプラグインや他のデータを移行します。
gcloud
プラグインを Cloud Composer 2 環境に移行します。これを行うには、Cloud Composer 1 環境のバケットから Cloud Composer 2 環境のバケットにある
/plugins
フォルダに、プラグインをエクスポートします。gcloud composer environments storage plugins export \ --destination=COMPOSER_2_BUCKET/plugins \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
/plugins
フォルダが正常にインポートされたことを確認します。gcloud composer environments storage plugins list \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
/data
フォルダを、Cloud Composer 1 環境から Airflow 2 環境にエクスポートします。gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
/data
フォルダが正常にインポートされたことを確認します。gcloud composer environments storage data list \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
ステップ 6: 接続を移行する
このステップでは、手動で接続を作成して移行する方法について説明します。
gcloud
Cloud Composer 1 環境の接続のリストを取得するために、次のコマンドを実行します。
gcloud composer environments run \ COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ connections list
Cloud Composer 2 環境で新しい接続を作成するために、
gcloud
を使用してconnections
Airflow CLI コマンドを実行します。次に例を示します。gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ connections add \ -- --conn-host postgres.example.com \ --conn-port 5432 \ --conn-type postgres \ --conn-login example_user \ --conn-password example_password \ --conn-description "Example connection" \ example_connection
ステップ 7: ユーザー アカウントを移行する
このステップでは、手動で接続を作成してユーザーを移行する方法について説明します。
Airflow UI
Cloud Composer 1 環境でユーザーの一覧を表示するには、次のようにします。
Cloud Composer 1 環境の Airflow ウェブ インターフェースを開きます。
[セキュリティ] > [ユーザーを一覧表示] に移動します。
Cloud Composer 2 環境でユーザーを作成するには、次のようにします。
Cloud Composer 2 環境の Airflow ウェブ インターフェースを開きます。
[セキュリティ] > [ユーザーを一覧表示] に移動します。
[新しいレコードの追加] をクリックします。
gcloud
-
Cloud Composer 1 環境でユーザーのリストを表示するには、
gcloud
でusers list
Airflow CLI コマンドを実行します。gcloud composer environments run \ COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ users list
以下のように置き換えます。
COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。
Cloud Composer 2 環境で新しいユーザー アカウントを作成するには、
gcloud
でusers create
Airflow CLI コマンドを実行します。次に例を示します。gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ users create \ -- --username example_username \ --firstname Example-Name \ --lastname Example-Surname \ --email example-user@example.com \ --use-random-password \ --role Op
以下のように置き換えます。
COMPOSER_2_ENV
は、Cloud Composer 2 環境の名前。COMPOSER_2_LOCATION
は、Cloud Composer 2 環境が配置されているリージョン。- ユーザーのロールを含む、Cloud Composer 1 環境のすべてのユーザー構成パラメータとその値。
別の方法でユーザー アカウントを移行する
別の方法としては、users export
と users import
の Airflow CLI コマンドを使用できます。
環境のバケット
/data
フォルダにあるファイルにユーザー アカウントをエクスポートします。gcloud composer environments run \ COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ users export -- /home/airflow/gcs/data/users.json
このファイルを Cloud Composer 2 環境のバケットにエクスポートします。
gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION \ --source=users.json
このファイルから Cloud Composer 2 環境にユーザー アカウントをインポートします。
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ users import \ -- /home/airflow/gcs/data/users.json
両方の環境で、その JSON ファイルを削除します。
gcloud composer environments storage data delete \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION \ users.json
gcloud composer environments storage data delete \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION \ users.json
以下のように置き換えます。
COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_2_ENV
は、Cloud Composer 2 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。COMPOSER_2_LOCATION
は、Cloud Composer 2 環境が配置されているリージョン。COMPOSER_2_BUCKET
は、前の手順で取得した Cloud Composer 2 環境バケットの URI。
ステップ 8: DAG を Cloud Composer 2 環境に移行する
DAG を環境間で転送すると、次の問題が発生する場合があります。
両方の環境で DAG が有効である(一時停止されていない)場合、各環境はスケジュールされたとおりに DAG の独自のコピーを実行します。これにより、同じデータと実行時間の重複した DAG 実行が発生する可能性があります。
DAG のキャッチアップのために、Airflow は DAG で指定された開始日に始まる追加の DAG 実行をスケジュールします。これは、新しい Airflow インスタンスで Cloud Composer 1 環境からの DAG 実行の履歴が考慮されないためです。このため、指定した開始日に始まるように大量の DAG 実行がスケジュールされる可能性があります。
DAG の重複実行を防止する
Cloud Composer 2 環境の Airflow 2 環境で、dags_are_paused_at_creation
オプションに対する Airflow 構成オプションのオーバーライドを追加します。この変更を行うと、デフォルトで新しいすべての DAG が一時停止されます。
セクション | キー | 値 |
---|---|---|
core |
dags_are_paused_at_creation |
True |
DAG 実行の追加や欠落を防ぐ
実行日のギャップや重複を回避するには、Cloud Composer 2 でのキャッチアップを無効にします。このようにして、DAG を Cloud Composer 2 環境にアップロードすると、Airflow では Cloud Composer 1 環境ですでに実行されている DAG 実行がスケジュールされません。catchup_by_default
オプションに対する Airflow 構成オプションのオーバーライドを追加します。
セクション | キー | 値 |
---|---|---|
scheduler |
catchup_by_default |
False |
DAG を Cloud Composer 2 環境に移行する
DAG を Cloud Composer 2 環境に移行するには、次のようにします。
Cloud Composer 1 環境から Cloud Composer 2 環境に DAG をアップロードします。
airflow_monitoring.py
DAG をスキップします。構成のオーバーライドにより、DAG は Cloud Composer 2 環境で一時停止され、DAG 実行はスケジュール設定されません。
Airflow ウェブ インターフェースで、[DAG] に移動し、報告された DAG 構文エラーを確認します。
DAG の転送を計画する時刻:
Cloud Composer 1 環境で、DAG を一時停止します。
Cloud Composer 2 環境で、DAG の一時停止を解除します。
新しい DAG 実行が正しい時間にスケジュール設定されていることを確認します。
Cloud Composer 2 環境で DAG の実行が発生するのを待ち、成功したかどうかを確認します。DAG の実行が成功した場合は、Cloud Composer 1 環境で一時停止を解除しないでください。解除すると、DAG が Cloud Composer 1 の環境で同じ日時に実行されます。
特定の DAG の実行が失敗した場合は、Cloud Composer 2 で正常に実行されるまで DAG のトラブルシューティングを行います。
必要な場合は、次のようにいつでも DAG の Cloud Composer 1 バージョンにフォールバックして、Cloud Composer 2 で失敗した DAG 実行を Cloud Composer 1 環境で実行できます。
Cloud Composer 2 環境で DAG を一時停止します。
Cloud Composer 1 環境で DAG の一時停止を解除します。これにより、Cloud Composer 1 環境で、一時停止された時間の DAG 実行を補う DAG がスケジュールされます。
ステップ 9: Cloud Composer 2 環境をモニタリングする
すべての DAG と構成を Cloud Composer 2 環境に移行した後、潜在的な問題、失敗した DAG 実行、環境全体の健全性をモニタリングします。Cloud Composer 2 環境が、十分な時間、問題なく動作している場合は、Cloud Composer 1 環境の削除を検討してください。