Pub/Sub Lite から Google Cloud Managed Service for Apache Kafka に移行する

このドキュメントでは、Pub/Sub Lite トピックから Managed Service for Apache Kafka トピックにデータを移行する方法について説明します。

始める前に

  1. Google Kubernetes Engine API を有効にします。
  2. 移行する Pub/Sub Lite トピックを特定します。Google Cloud Managed Service for Apache Kafka のターゲット トピックの名前を決定します。また、移行先の Managed Service for Apache Kafka クラスタも決定します。

移行ワークフロー

データを移行するには、次のタスクを完了します。これらのタスクの詳細については、このページの後半で説明します。

  1. Google Kubernetes Engine サービス アカウントを作成する。
  2. Google Kubernetes Engine クラスタを作成する。
  3. トピックの詳細な構成を使用してDocker イメージをカスタマイズする。
  4. Docker イメージを Google Kubernetes Engine クラスタにデプロイする。

    イメージ内で、Kafka Connect と Kafka Connect 用の Pub/Sub Lite プラグインを使用して、新しい Pub/Sub Lite サブスクリプションにサブスクライブし、Managed Service for Apache Kafka にパブリッシュします。

Google Kubernetes Engine サービス アカウントを作成する

このセクションでは、Google Kubernetes Engine クラスタを実行するために必要な権限を持つ IAM サービス アカウントを作成する方法について説明します。

  1. Google Cloud コンソールで、Google Kubernetes Engine の運用に必要な最小権限を持つ新しい IAM サービス アカウントを作成します。

  2. サービス アカウントに次の IAM ロールを付与します。これらのロールは移行プロセスを容易にします。

    • マネージド Kafka クライアント ロール(roles/managedkafka.client
    • Pub/Sub Lite サブスクライバーのロール(roles/pubsublite.subscriber
    • Pub/Sub Lite 閲覧者のロール(roles/pubsublite.Viewer
    • Artifact Registry 読み取りロール(roles/artifactregistry.reader

GKE クラスタを作成する

このセクションでは、前の手順で作成したサービス アカウントを使用する GKE クラスタを作成する方法について説明します。

  1. Google Cloud コンソールで [Google Kubernetes Engine] ページに移動します。

    Google Kubernetes Engine に移動

  2. [ 作成] をクリックします。

    [Autopilot クラスタの作成] ページが表示されます。

  3. [Advanced Settings] タブで、サービス アカウントを前の手順で作成した IAM サービス アカウントに変更します。

  4. 必要に応じて、他の設定を構成します。

  5. クラスタを作成するには、[作成] をクリックします。

Kafka Connect Docker イメージを作成する

このセクションでは、トピック用の Kafka Connect Docker イメージを作成してカスタマイズする方法について説明します。

  1. Pub/Sub Lite 移行の GitHub リポジトリのクローンを作成します。
  2. 前に作成した IAM サービス アカウントに JSON アカウント キーを生成します。

    base64 ツールを使用して JSON キーをエンコードします。たとえば、

    Linux

    base64 -w 0 < my_service_account.json > password.txt
    

    Mac

    base64 < account_key_json > password.txt
    
  3. GitHub リポジトリのシークレット ファイルで、次のファイルを適切な情報で更新して、リポジトリを Google Cloud プロジェクト、Pub/Sub Lite、Kafka にリンクします。

    .gcp/gmk_sasl_service_account → sensitive
    <service-account-name>@<gcp-project>.iam.gserviceaccount.com
    
    .gcp/gmk_sasl_service_account_key → sensitive
    <base64 encoded sasl service account key>
    
    .gcp/kafka_ssl_truststore_location → sensitive
    <full path of the ssl truststore jks file location>
    
    .gcp/kafka_ssl_truststore_password → sensitive
    <password for the ssl truststore jks>
    
    .gcp/gmk_bootstrap_servers → environment specific
    bootstrap.<google-managed-kafka-cluster-name>.<google-managed-kafka-cluster-region name>.managedkafka.<google-managed-cluster-host-project-name>.cloud.goog:9092
    
    .gcp/kafka_connect_group_id → environment specific
    <Kafka Connect group id (unique per worker group) for the Kafka connect workers in distributed mode>
    
    .gcp/kafka_config_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the config>
    
    .gcp/kafka_offset_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the offsets>
    
    .gcp/kafka_status_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the status>
    
    .gcp/kafka_sink_topic → environment specific
    <target sink Kafka topic name used by Kafka Connect for migrating the data from the Pub/Sub Lite topic>
    
    .gcp/pubsub_lite_gcp_project → environment specific
    <Google Cloud project that hosts the Pub/Sub Lite source subscription to be used for migrating the Pub/Sub Lite topic to sink the Kafka topic>
    
    .gcp/pubsub_lite_gcp_location → environment specific
    <Google Cloud location for the Pub/Sub Lite source subscription tor migrate the Pub/Sub Lite topic to sink Kafka topic>
    
    .gcp/pubsub_lite_subscription → environment specific
    <Pub/Sub Lite source subscription name to be used for migrating the pubsub lite topic to Kafka topic>
    
  4. docker/build-image.sh ファイルを実行して Docker イメージをビルドします。

    ./push-image.sh
    
  5. docker/push-image.sh イメージを Google Cloud プロジェクト名で更新します。

  6. docker/push-image.sh ファイルを実行して、イメージを Artifact Registry に push します。

    ./push-image.sh
    

Kafka Connect ワークロードをデプロイする

このセクションでは、Kafka Connect Docker イメージを Google Kubernetes Engine クラスタにデプロイする方法について説明します。

  1. 認証プラグインを使用して kubectl をインストールして構成します。
  2. Google Kubernetes Engine クラスタの kubeconfig を生成します。
  3. Google Kubernetes Engine サービス アカウントを作成し、IAM アカウントの権限を借用するための適切な権限を付与します。

    $KSA_NAME = KUBERNETES_SERVICE_ACCOUNT_NAME
    $PROJECT_ID = GOOGLE_CLOUD_PROJECT_ID
    $IAM_SA_NAME = IAM_SERVICE_ACCOUNT_NAME
    
    kubectl create serviceaccount $KSA_NAME \
        --namespace=default
    
    gcloud iam service-accounts add-iam-policy-binding \
    $IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/$KSA_NAME]"
    
    kubectl annotate serviceaccount $KSA_NAME \
        --namespace default \
    iam.gke.io/gcp-service-account=$IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com
    
  4. テキスト エディタで K8s.yaml ファイルを開き、次の値を更新します。

    1. <workflow_name> は、Kafka Connect ワークフローの名前に置き換えます。
    2. <gke_service_account> は、Google Kubernetes Engine サービス アカウント名に置き換えます。
  5. K8s.yaml ファイルを実行します。

    kubectl create -f k8s.yaml
    

    これにより、Google Kubernetes Engine クラスタで実行される Kafka Connect ワークロードが作成され、Pub/Sub Lite コネクタが起動して、Pub/Sub Lite トピックから Google Cloud Managed Service for Apache Kafka にデータが移動されます。

ジョブをモニタリングする

ジョブが実行されている場合は、Kafka Connect REST エンドポイントに接続して検査できます。

  1. Google Cloud コンソールで、[Deployment detail] > [Workload] ページに移動します。
  2. Kubernetes Deployment をクリックします。

    デプロイの詳細ページが開きます。

  3. [サービスの公開] で [公開] をクリックし、ポート 8083 を追加します。

  4. ポート転送を有効にします。

    ポート転送の設定で取得したデフォルトのリンクは、次のような出力を返します。

    {"version":"3.4.0","commit":"2e1947d240607d53","kafka_cluster_id":"6H6qWA0dQnuK31hBPqYUDg"}
    

    リンクに /connectors を追加すると、実行中のコネクタが一覧表示されます。次に例を示します。

    ["PubSubLiteSourceConnector"]
    

    たとえば、このリンク url:8083/connectors/PubSubLiteSourceConnector/status を確認すると、タスクとそのステータスのリストが表示されます。

    {"name":"PubSubLiteSourceConnector","connector":{"state":"RUNNING","worker_id":"10.53.0.157:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":1,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":2,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":3,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":4,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":5,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":6,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":7,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":8,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":9,"state":"RUNNING","worker_id":"10.53.0.157:8083"}],"type":"source"}
    

段階的な移行

Pub/Sub Lite トピックが Kafka に移行されたら、サブスクライバーとパブリッシャーを移行できます。その手順は次のとおりです。

  1. チャンネル登録者を移行する。Pub/Sub Lite トピックではなく Kafka トピックを消費するようにサブスクライバーを更新します。

    これは、制御された開発環境で段階的に行う必要があります。

    理想的には、2 つのサブスクライバー セットを維持して、Kafka と Pub/Sub Lite の両方から同じメッセージが受信されることを確認する必要があります。正しい動作が確認できたら、Pub/Sub Lite サブスクライバーを廃止できます。

  2. パブリッシャーを移行する。Pub/Sub Lite トピックではなく Kafka トピックに直接パブリッシュするようにパブリッシャーを更新します。

    定期購入者の移行と同様に、制御された開発環境で段階的に行う必要があります。重複データが問題にならない場合は、2 つのパブリッシャーを維持して動作を確認できます。動作を確認したら、Pub/Sub Lite パブリッシャーを廃止します。

  3. すべてのサブスクライバーとパブリッシャーが移行されたら、ワークロードとクラスタを削除して移行ツールを廃止します。

  4. 元の Pub/Sub Lite トピックを削除します。

次のステップ