Kafka コンシューマー ワークロードを自動スケーリングする

このチュートリアルでは、Kafka オートスケーラーを Cloud Run サービスとして構成してデプロイする方法について説明します。このオートスケーラーは、Cloud Run ワーカープールのデプロイなど、Kafka コンシューマー ワークロードのスケーリング ロジックを実行します。Kafka オートスケーラーは Kafka クラスタから指標を読み取り、Cloud Run のワーカープールまたはサービスの手動スケーリングを使用して、Kafka コンシューマーのラグ指標に基づいて Kafka コンシューマー ワークロードをスケーリングします。

次の図は、Kafka オートスケーラー サービスが Kafka クラスタから指標を読み取って Kafka コンシューマー ワーカープールを自動スケーリングする方法を示しています。

Kafka オートスケーラー サービスが Kafka から指標を取得し、Kafka コンシューマーを自動スケーリングする

必要なロール

このサービスのデプロイと実行に必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

始める前に

Kafka オートスケーラーを構成して使用するには、次のリソースが必要です。

  • Kafka クラスタ
  • デプロイされたコンシューマー

Kafka クラスタ

デプロイされた Cloud Run コンシューマー

  • Kafka コンシューマー ワークロードは、サービスまたはワーカープールとして Cloud Run にデプロイする必要があります。Kafka クラスタ、トピック、コンシューマー グループに接続するように構成する必要があります。Kafka コンシューマーの例については、Cloud Run Kafka オートスケーラーのコンシューマーの例をご覧ください。
  • コンシューマー ワークロードは、Kafka クラスタと同じ Google Cloud プロジェクトに存在する必要があります。

ベスト プラクティス

  • ダイレクト VPC を使用して、Kafka コンシューマーを VPC ネットワークに接続します。ダイレクト VPC を使用すると、プライベート IP アドレスを使用して Kafka クラスタに接続し、VPC ネットワークでトラフィックを維持できます。
  • コンシューマーがイベントを pull しているかどうかを確認する Kafka コンシューマーの活性ヘルスチェックを構成します。ヘルスチェックは、コンテナがクラッシュしなくても、正常でないインスタンスがイベントの処理を停止した場合に、そのインスタンスが自動的に再起動されるようにします。

Kafka オートスケーラーを構築する

Cloud Build を使用して、Kafka Autoscaler のコンテナ イメージをソースコードからビルドできます。

  1. リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/cloud-run-kafka-scaler.git
    
  2. リポジトリ フォルダに移動します。

    cd cloud-run-kafka-scaler
    

出力イメージ名を指定するには、含まれている cloudbuild.yaml ファイルの %ARTIFACT_REGISTRY_IMAGE% を更新します(例: us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler)。

gcloud builds submit --tag us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler

このコマンドは、コンテナ イメージをビルドして Artifact Registry に push します。後で必要になるため、イメージの完全なパス(SCALER_IMAGE_PATH)を記録します。

生成されたイメージはローカルで実行されません。Java ベースイメージの上にレイヤリングすることを目的としています。ローカルで実行するためにコンテナ イメージを再構築する方法など、詳細については、ベースイメージの自動更新を構成するをご覧ください。

Kafka オートスケーラーの構成を定義する

Kafka オートスケーラーは、シークレットを使用して構成できます。オートスケーラーは構成を定期的に更新します。つまり、オートスケーラーを再デプロイしなくても、新しいシークレット バージョンを push して構成を変更できます。

Kafka クライアントのプロパティを構成する

Kafka オートスケーラーをデプロイするときに、シークレットをボリュームとしてマウントすることで、Kafka Admin API への接続を構成できます。

kafka_client_config.txt という名前のファイルを作成し、追加する Kafka 管理クライアント構成プロパティを含めます。bootstrap.servers プロパティは必須です。

bootstrap.servers=BOOTSTRAP_SERVER_LIST

BOOTSTRAP_SERVER_LIST は、Kafka クラスタの HOST:PORT リストに置き換えます。

Kafka 認証を構成する

Kafka サーバーで認証が必要な場合は、必要な構成プロパティを kafka_client_config.txt ファイルに含めます。たとえば、Google OAuth でアプリケーションのデフォルト認証情報を使用して Managed Service for Apache Kafka に接続するには、このシークレットに次のプロパティを含める必要があります。

bootstrap.servers=BOOTSTRAP_SERVER_LIST
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

BOOTSTRAP_SERVER_LIST は、Kafka クラスタの HOST:PORT リストに置き換えます。

Managed Service for Apache Kafka クラスタでアプリケーションのデフォルト認証情報を使用するには、Kafka オートスケーラー サービス アカウントに Managed Kafka Clientroles/managedkafka.client)ロールを付与する必要があります。

gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/managedkafka.client"

次のように置き換えます。

  • SCALER_SERVICE_ACCOUNT: Kafka オートスケーラー サービス アカウントの名前。
  • PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。

デプロイ時にボリュームとしてマウントされるシークレットを作成するには、kafka_client_config.txt ファイルを使用します。

gcloud secrets create ADMIN_CLIENT_SECRET_NAME --data-file=kafka_client_config.txt

ADMIN_CLIENT_SECRET_NAME は、Kafka 認証シークレットの名前に置き換えます。

スケーリングを構成する

Kafka オートスケーラーは、/scaler-config/scaling ボリュームからスケーリング構成を読み取ります。このボリュームの内容は YAML としてフォーマットする必要があります。この構成では、シークレット ボリュームをマウントすることをおすすめします。

次の構成で scaling_config.yaml という名前のファイルを作成します。

spec:
  scaleTargetRef:
    name: projects/PROJECT_ID/locations/REGION/workerpools/CONSUMER_SERVICE_NAME
 metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: TARGET_CPU_UTILIZATION
        activationThreshold: CPU_ACTIVATION_THRESHOLD
        tolerance: CPU_TOLERANCE
        windowSeconds: CPU_METRIC_WINDOW
  - type: External
    external:
      metric:
        name: consumer_lag
      target:
        type: AverageValue
        averageValue: LAG_THRESHOLD
        activationThreshold: LAG_ACTIVATION_THRESHOLD
        tolerance: LAG_TOLERANCE

次のように置き換えます。

  • PROJECT_ID: 自動スケーリングする Kafka コンシューマー ワークロードのプロジェクト ID。
  • REGION: 自動スケーリングする Kafka コンシューマー ワークロードのリージョン。
  • CONSUMER_SERVICE_NAME: 自動スケーリングする Kafka コンシューマー ワークロードの名前。
  • TARGET_CPU_UTILIZATION: 自動スケーリングの計算に使用する CPU 使用率の目標値。例: 60
  • LAG_THRESHOLD: consumer_lag 指標の自動スケーリングをトリガーするしきい値(例: 1000)。
  • (省略可) CPU_ACTIVATION_THRESHOLD: CPU のアクティベーションしきい値。すべての指標が非アクティブになると、ターゲット コンシューマーはゼロにスケーリングされます。デフォルトは 0 です。
  • (省略可) CPU_TOLERANCE: 指定された範囲内にある場合にスケーリングの変更を防ぐしきい値。ターゲット CPU 使用率の割合で表されます。デフォルトは 0.1 です。
  • (省略可) CPU_METRIC_WINDOW: CPU 使用率の平均を計算する期間(秒単位)。デフォルトは 120 です。
  • (省略可) LAG_ACTIVATION_THRESHOLD: consumer_lag 指標のアクティベーションしきい値。すべての指標が非アクティブになると、ターゲット コンシューマーはゼロにスケーリングされます。デフォルトは 0 です。
  • (省略可) LAG_TOLERANCE: 指定された範囲内にある場合にスケーリングの変更を防ぐしきい値。目標コンシューマー ラグの割合として表されます。デフォルトは 0.1 です。

必要に応じて、behavior: ブロックを使用して高度なスケーリング プロパティを構成できます。このブロックは、Kubernetes HPA スケーリング ポリシーと同じ多くのプロパティをサポートしています。

behavior ブロックを指定しない場合は、次のデフォルト構成が使用されます。

behavior:
  scaleDown:
    stabilizationWindowSeconds: 300
    policies:
    - type: Percent
      value: 50
      periodSeconds: 30
    selectPolicy: Min
  scaleUp:
    stabilizationWindowSeconds: 0
    policies:
    - type: Percent
      value: 100
      periodSeconds: 15
    - type: Instances
      value: 4
      periodSeconds: 15
    selectPolicy: Max

デプロイにマウントされるシークレット ボリュームを作成するには、構成を scaling_config.yaml という名前のファイルにコピーし、次のコマンドを実行します。

gcloud secrets create SCALING_CONFIG_SECRET_NAME --data-file=scaling_config.yaml

SCALING_CONFIG_SECRET_NAME は、スケーリング シークレットの名前に置き換えます。

Kafka オートスケーラーをデプロイする

前提条件を満たしたら、Kafka オートスケーラー サービスとそのサポート インフラストラクチャをデプロイできます。このプロセスを簡素化するために、Terraform モジュールとシェル スクリプトが用意されています。

gcloud

このセクションでは、オートスケーラーを手動でデプロイするために必要な各 gcloud コマンドについて説明します。ほとんどの場合、代わりにシェル スクリプトまたは Terraform モジュールを使用することをおすすめします。

サービス アカウントを作成する

サービス アカウントの要件は、構成した自動スケーリング チェック間隔によって異なります。Kafka オートスケーラーを構成して、柔軟な間隔で自動スケーリング チェックを実行できます。

  • 1 分以上: Cloud Scheduler は、選択された間隔で POST リクエストを使用して自動スケーリング チェックをトリガーします。
  • 1 分未満: Cloud Scheduler は、構成された頻度に基づいて、毎分複数の Cloud Tasks の作成をトリガーします。

1 分以上

Kafka オートスケーラー サービス アカウント

Kafka オートスケーラーのサービス アカウントを作成します。

gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT

SCALER_SERVICE_ACCOUNT は、Kafka オートスケーラー サービス アカウントの名前に置き換えます。

Kafka オートスケーラーが Kafka コンシューマー インスタンスの数を更新するには、次の権限が必要です。

  • Kafka コンシューマー サービス アカウントの iam.serviceaccounts.actAs
  • Kafka コンシューマー イメージを含むリポジトリの roles/artifactregistry.reader
  • run.workerpools.getrun.workerpools.update。これらの権限は、Cloud Run 管理者ロール(roles/run.admin)に含まれています。
  • スケーリングと Kafka 認証の両方のシークレットに roles/secretmanager.secretAccessor
  • Kafka コンシューマー プロジェクトの roles/monitoring.viewer。CPU 使用率の指標を読み取るには、このロールが必要です。
  • Kafka コンシューマー プロジェクトの roles/monitoring.metricWriter。このロールは省略可能ですが、オートスケーラーがカスタム指標を出力してオブザーバビリティを向上させることができます。
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/iam.serviceAccountUser"

gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/artifactregistry.reader" \
    --location=REPO_REGION

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/run.admin"

gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.viewer" \
    --condition=None

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.metricWriter" \
    --condition=None

次のように置き換えます。

  • PROJECT_ID: Kafka オートスケーラー サービスが配置されているプロジェクト ID。
  • CONSUMER_SERVICE_ACCOUNT_EMAIL: Kafka コンシューマーのサービス アカウントのメールアドレス。例: example@PROJECT-ID.iam.gserviceaccount.com
  • SCALER_SERVICE_ACCOUNT: Kafka オートスケーラーのサービス アカウント。
  • ADMIN_CLIENT_SECRET_NAME: Kafka 認証シークレットの名前。
  • SCALING_CONFIG_SECRET_NAME: スケーリング シークレットの名前。
  • CONSUMER_IMAGE_REPO: Kafka コンシューマーのコンテナ イメージを含むリポジトリの ID または完全修飾識別子。
  • REPO_REGION: コンシューマー イメージ リポジトリのロケーション。

1 分未満

Cloud Tasks を設定する

Cloud Scheduler は、1 分以上の間隔でのみトリガーできます。1 分未満の間隔の場合は、Cloud Tasks を使用して Kafka オートスケーラーをトリガーします。Cloud Tasks を設定するには、次のものが必要です。

  • 自動スケーリング チェックタスク用の Cloud Tasks キューを作成します。
  • Cloud Run 起動元ロールを使用して Kafka オートスケーラーを呼び出すために Cloud Tasks が使用するサービス アカウントを作成します。
gcloud tasks queues create CLOUD_TASKS_QUEUE_NAME \
--location=REGION
gcloud iam service-accounts create TASKS_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
    --member="serviceAccount:TASKS_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/run.invoker"

次のように置き換えます。

  • CLOUD_TASKS_QUEUE_NAME: 自動スケーリング チェックをトリガーするために構成された Cloud Tasks キュー。
  • TASKS_SERVICE_ACCOUNT: Cloud Tasks が自動スケーリング チェックをトリガーするために使用するサービス アカウント。
  • SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
  • PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
  • REGION: Kafka オートスケーラー サービスのロケーション。

Kafka オートスケーラーのサービス アカウントを設定する

Kafka オートスケーラーのサービス アカウントを作成します。

gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT

SCALER_SERVICE_ACCOUNT は、Kafka オートスケーラー サービス アカウントの名前に置き換えます。

Kafka コンシューマー インスタンスの数を更新し、自動スケーリング チェックのタスクを作成するには、Kafka オートスケーラーに次の権限が必要です。

  • Kafka コンシューマー サービス アカウントの iam.serviceaccounts.actAs
  • Kafka コンシューマー イメージを含むリポジトリの roles/artifactregistry.reader
  • run.workerpools.getrun.workerpools.update。これらの権限は、Cloud Run 管理者ロール(roles/run.admin)に含まれています。
  • スケーリングと Kafka 認証の両方のシークレットの roles/secretmanager.secretAccessor
  • Kafka コンシューマー プロジェクトの roles/monitoring.viewer。CPU 使用率の指標を読み取るには、このロールが必要です。
  • Kafka コンシューマー プロジェクトの roles/monitoring.metricWriter。このロールは省略可能ですが、オートスケーラーがカスタム指標を出力してオブザーバビリティを向上させることができます。
  • Cloud Tasks へのデータ追加ロール(roles/cloudtasks.enqueuer)。
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/iam.serviceAccountUser"

gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/artifactregistry.reader" \
    --location=REPO_REGION

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/run.admin"

gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.viewer" \
    --condition=None

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.metricWriter" \
    --condition=None

gcloud tasks queues add-iam-policy-binding CLOUD_TASKS_QUEUE_NAME \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/cloudtasks.enqueuer" \
    --location=REGION

次のように置き換えます。

  • PROJECT_ID: Kafka オートスケーラー サービスが配置されているプロジェクト ID。
  • CONSUMER_SERVICE_ACCOUNT_EMAIL: Kafka コンシューマーのサービス アカウントのメールアドレス。例: example@PROJECT_ID.iam.gserviceaccount.com
  • SCALER_SERVICE_ACCOUNT: Kafka オートスケーラーのサービス アカウント。
  • CONSUMER_IMAGE_REPO: Kafka コンシューマーのコンテナ イメージを含むリポジトリの ID または完全修飾識別子。
  • ADMIN_CLIENT_SECRET_NAME: Kafka 認証シークレットの名前。
  • SCALING_CONFIG_SECRET_NAME: スケーリング シークレットの名前。
  • REPO_REGION: コンシューマー イメージ リポジトリのロケーション。
  • CLOUD_TASKS_QUEUE_NAME: 自動スケーリング チェックをトリガーするために構成された Cloud Tasks キュー。
  • REGION: Kafka オートスケーラー サービスのロケーション。

環境変数を構成する

1 分以上

Kafka オートスケーラーは、環境変数を使用して、Kafka コンシューマーとターゲット ワークロードのその他の側面を指定します。セキュリティのため、機密情報を シークレットとして構成することをおすすめします。

次の変数を使用して、scaler_env_vars.yaml という名前の YAML ファイルを作成します。

KAFKA_TOPIC_ID: KAFKA_TOPIC_ID
CONSUMER_GROUP_ID: CONSUMER_GROUP_ID
CYCLE_SECONDS: CYCLE_SECONDS
OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS

次のように置き換えます。

  • KAFKA_TOPIC_ID: Kafka コンシューマーがサブスクライブするトピック ID。
  • CONSUMER_GROUP_ID: ターゲット Kafka コンシューマーが使用するコンシューマー グループ ID。これらの値は一致している必要があります。一致していない場合、自動スケーリングは失敗します。
  • CYCLE_SECONDS: オートスケーラーのサイクル期間(秒単位)。
  • OUTPUT_SCALER_METRICS: 指標を有効にする設定。カスタム指標の出力を有効にするには、値を true に設定します。それ以外の場合は false に設定します。

1 分未満

Kafka オートスケーラーは、環境変数を使用して、Kafka コンシューマーとターゲット ワークロードのその他の側面を指定します。セキュリティのため、機密情報を シークレットとして構成することをおすすめします。

次の変数を使用して、scaler_env_vars.yaml という名前の YAML ファイルを作成します。

KAFKA_TOPIC_ID: KAFKA_TOPIC_ID
CONSUMER_GROUP_ID: CONSUMER_GROUP_ID
CYCLE_SECONDS: CYCLE_SECONDS
OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS
FULLY_QUALIFIED_CLOUD_TASKS_QUEUE_NAME: CLOUD_TASKS_QUEUE_NAME
INVOKER_SERVICE_ACCOUNT_EMAIL: TASKS_SERVICE_ACCOUNT_EMAIL

次のように置き換えます。

  • KAFKA_TOPIC_ID: Kafka コンシューマーがサブスクライブするトピック ID。
  • CONSUMER_GROUP_ID: ターゲット Kafka コンシューマーが使用するコンシューマー グループ ID。これらの値は一致している必要があります。一致していない場合、自動スケーリングは失敗します。
  • CYCLE_SECONDS: オートスケーラーのサイクル期間(秒単位)。
  • OUTPUT_SCALER_METRICS: 指標を有効にする設定。カスタム指標の出力を有効にするには、値を true に設定します。それ以外の場合は false に設定します。
  • CLOUD_TASKS_QUEUE_NAME: 自動スケーリング チェックをトリガーする Cloud Tasks キューの完全修飾名。形式は projects/$PROJECT_ID/locations/$REGION/queues/$CLOUD_TASKS_QUEUE_NAME です。
  • TASKS_SERVICE_ACCOUNT_EMAIL: Cloud Tasks が自動スケーリング チェックをトリガーするために使用するサービス アカウント。例: example@PROJECT_ID.iam.gserviceaccount.com

提供されたイメージを使用して Kafka オートスケーラーをデプロイし、scaler_env_vars.yaml ファイルとシークレット ボリューム マウントを使用して Kafka VPC に接続します。

gcloud run deploy SCALER_SERVICE_NAME \
    --image=SCALER_IMAGE_URI \
    --env-vars-file=scaler_env_vars.yaml \
    --service-account=SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
    --no-allow-unauthenticated \
    --network=KAFKA_VPC_NETWORK \
    --subnet=KAFKA_VPC_SUBNET \
    --update-secrets=/kafka-config/kafka-client-properties=ADMIN_CLIENT_SECRET_NAME:latest \
    --update-secrets=/scaler-config/scaling=SCALING_CONFIG_SECRET_NAME:latest
    --labels=created-by=kafka-autoscaler

次のように置き換えます。

  • SCALER_IMAGE_URI: Kafka オートスケーラー イメージの URI。
  • SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
  • SCALER_SERVICE_ACCOUNT: Kafka オートスケーラー サービス アカウントの名前。
  • PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
  • KAFKA_VPC_NETWORK: Kafka クラスタに接続されている VPC ネットワーク。
  • KAFKA_VPC_SUBNET: Kafka クラスタに接続されている VPC サブネット。
  • ADMIN_CLIENT_SECRET_NAME: Kafka 認証シークレットの名前。
  • SCALING_CONFIG_SECRET_NAME: スケーリング シークレットの名前。

定期的な自動スケーリング チェックを設定する

このセクションでは、Cloud Scheduler を使用して定期的な自動スケーリング チェックをトリガーします。

  • 1 分以上: 選択した間隔でトリガーするように Cloud Scheduler を構成します。
  • 1 分未満: 毎分トリガーするように Cloud Scheduler を構成する
呼び出し元サービス アカウントを作成する

Cloud Scheduler が Kafka オートスケーラーを呼び出せるようにするには、Kafka オートスケーラー サービスに起動元ロール(roles/run.invoker)を持つサービス アカウントを作成する必要があります。

gcloud iam service-accounts create SCALER_INVOKER_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
  --member="serviceAccount:SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/run.invoker"

次のように置き換えます。

  • SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
  • SCALER_INVOKER_SERVICE_ACCOUNT: 呼び出し元サービス アカウントの名前。
  • PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
Cloud Scheduler ジョブの作成

1 分以上

選択した自動スケーリング チェック間隔で Cloud Scheduler ジョブを作成します。

gcloud scheduler jobs create http kafka-scaling-check \
    --location=REGION \
    --schedule="CRON_SCHEDULE" \
    --time-zone="TIMEZONE" \
    --uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
    --oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
    --http-method=POST

次のように置き換えます。

  • SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
  • SCALER_INVOKER_SERVICE_ACCOUNT: 呼び出し元サービス アカウントの名前。
  • PROJECT_ID: プロジェクト ID または Kafka オートスケーラー サービス。
  • PROJECT_NUMBER: Kafka オートスケーラー サービスのプロジェクト番号。
  • REGION: Kafka オートスケーラー サービスのロケーション。
  • TIMEZONE: タイムゾーン(例: America/Los_Angeles)。
  • CRON_SCHEDULE: 選択したスケジュール(Crontab 形式)。たとえば、1 分ごとに "* * * * *" とします。

1 分未満

毎分実行される Cloud Scheduler ジョブを作成します。

gcloud scheduler jobs create http kafka-scaling-check \
    --location=REGION \
    --schedule="* * * * *" \
    --time-zone="TIMEZONE" \
    --uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
    --oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
    --http-method=POST

次のように置き換えます。

  • SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
  • SCALER_INVOKER_SERVICE_ACCOUNT: 呼び出し元サービス アカウントの名前。
  • PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
  • PROJECT_NUMBER: Kafka オートスケーラー サービスのプロジェクト番号。
  • REGION: Kafka オートスケーラー サービスのロケーション。
  • TIMEZONE: タイムゾーン(例: America/Los_Angeles)。

Terraform

terraform/ ディレクトリには、Kafka オートスケーラーとそれに関連するリソースのプロビジョニングに使用できる再利用可能な Terraform モジュールが含まれています。

このモジュールでは、次の作成が自動化されます。

  • Kafka オートスケーラーの Cloud Run サービス
  • サービス アカウントと IAM バインディングのサポート
  • Cloud Tasks キュー
  • Cloud Scheduler ジョブ

詳細な手順、使用例、すべての入出力変数の説明については、terraform readme をご覧ください。

Terraform モジュールに必要な変数を指定する必要があります。これには、プロジェクト ID、リージョン、コンシューマー SA メール、シークレット名、スケーラー イメージパス、トピック ID など、前提条件の詳細が含まれます。

shell

オートスケーラーには、必要なすべてのリソースを自動的に作成して構成する setup_kafka_scaler.sh スクリプトが用意されています。

環境変数を設定する

スクリプトを実行する前に、必要な環境変数をすべて設定していることを確認してください。

# Details for already-deployed Kafka consumer
export PROJECT_ID=PROJECT_ID
export REGION=REGION
export CONSUMER_SERVICE_NAME=DEPLOYED_KAFKA_CONSUMER
export CONSUMER_SA_EMAIL=KAFKA_CONSUMER_ACCOUNT_EMAIL # For example, NAME@PROJECT_ID.iam.gserviceaccount.com
export TOPIC_ID=KAFKA_TOPIC_ID
export CONSUMER_GROUP_ID=KAFKA_CONSUMER_GROUP_ID
export NETWORK=VPC_NETWORK
export SUBNET=VPC_SUBNET

# Details for new items to be created during this setup
export CLOUD_TASKS_QUEUE_NAME=CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS
export TASKS_SERVICE_ACCOUNT=TASKS_SERVICE_ACCOUNT_NAME

export SCALER_SERVICE_NAME=KAFKA_AUTOSCALER_SERVICE_NAME
export SCALER_IMAGE_PATH=KAFKA_AUTOSCALER_IMAGE_URI
export SCALER_CONFIG_SECRET=KAFKA_AUTOSCALER_CONFIG_SECRET_NAME

export CYCLE_SECONDS=SCALER_CHECK_FREQUENCY # For example, 15; this value should be at least 5 seconds.

export OUTPUT_SCALER_METRICS=false # If you want scaling metrics to outputted to Cloud Monitoring set this to true and ensure your scaler service account has permission to write metrics (for example, via roles/monitoring.metricWriter).

次のように置き換えます。

  • PROJECT_ID: Kafka オートスケーラー サービスが配置されているプロジェクト ID。
  • REGION: Kafka オートスケーラー サービスのロケーション。
  • DEPLOYED_KAFKA_CONSUMER: Kafka コンシューマーの名前。
  • KAFKA_CONSUMER_ACCOUNT_EMAIL: Kafka コンシューマーのサービス アカウントのメールアドレス。
  • KAFKA_TOPIC_ID: Kafka コンシューマーがサブスクライブするトピック ID。
  • KAFKA_CONSUMER_GROUP_ID: ターゲット Kafka コンシューマーが使用するコンシューマー グループ ID。これらの値は一致している必要があります。一致していない場合、自動スケーリングは失敗します。
  • VPC_NETWORK: Kafka クラスタに接続されている VPC ネットワーク。
  • VPC_SUBNET: Kafka クラスタに接続されている VPC サブネット。
  • CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS: 自動スケーリング チェックをトリガーするために構成された Cloud Tasks キュー。
  • TASKS_SERVICE_ACCOUNT_NAME: Cloud Tasks が自動スケーリング チェックをトリガーするために使用するサービス アカウント。
  • KAFKA_AUTOSCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
  • KAFKA_AUTOSCALER_IMAGE_URI: Kafka オートスケーラー イメージの URI。
  • KAFKA_AUTOSCALER_CONFIG_SECRET_NAME: スケーリング シークレットの名前。
  • SCALER_CHECK_FREQUENCY: オートスケーラーのサイクル期間(秒単位)。

設定スクリプトを実行する

提供された setup_kafka_scaler.sh スクリプトを実行します。

./setup_kafka_scaler.sh

スクリプトは次のアクションを実行します。

  • 自動スケーリング チェックのトリガーに使用される Cloud Tasks キューを作成します。
  • Kafka オートスケーラー サービス アカウントを作成し、必要な権限を付与します。
  • Kafka オートスケーラーを構成してデプロイします。
  • 自動スケーリング チェックを定期的にトリガーする Cloud Scheduler ジョブを作成します。

setup_kafka_scaler.sh スクリプトを実行すると、構成された環境変数が出力されます。続行する前に、環境変数が正しいことを確認してください。

追加の権限を付与する

Kafka コンシューマーのインスタンス数を変更するには、Kafka オートスケーラー サービス アカウントに、デプロイされたコンテナ イメージに対する閲覧権限が必要です。たとえば、コンシューマー イメージが Artifact Registry からデプロイされた場合は、次のコマンドを実行します。

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$SCALER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/artifactregistry.reader" # Or appropriate role for your registry

Kafka の自動スケーリングが機能していることを確認する

Kafka オートスケーラー サービスのスケーリングは、サービス URL(SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app)へのリクエストでトリガーされます。

Kafka オートスケーラー サービスに POST リクエストを送信して、自動スケーリングの計算をトリガーできます。

curl -X POST -H "Authorization: Bearer $(gcloud auth print-identity-token)" https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app

次のように置き換えます。

  • SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
  • PROJECT_NUMBER: Kafka オートスケーラー サービスのプロジェクト番号。
  • REGION: Kafka オートスケーラー サービスのロケーション。

POST リクエストは、自動スケーリングの計算をトリガーし、ロギングに出力し、推奨事項に基づいてインスタンス数を変更します。

Kafka オートスケーラー サービスのログには、[SCALING] Recommended instances X などのメッセージが含まれている必要があります。

OUTPUT_SCALER_METRICS フラグが有効になっている場合は、custom.googleapis.com/cloud-run-kafkascaler にスケーラーの Cloud Monitoring 指標もあります。

高度なスケーリング構成

spec:
  metrics:
  behavior:
    scaleDown:
      stabilizationWindowSeconds: [INT]
      policies:
      - type: [Percent, Instances]
        value: [INT]
        periodSeconds: [INT]
      selectPolicy: [Min, Max]
    scaleUp:
      stabilizationWindowSeconds: [INT]
      policies:
      - type: [Percent, Instances]
        value: [INT]
        periodSeconds: [INT]
      selectPolicy: [Min, Max]

次のリストに、上記の要素の一部を示します。

  • scaleDown: インスタンス数を減らす(スケールダウン)ときの動作。
  • scaleUp: インスタンス数を増やす(スケールアップ)ときの動作。
  • stabilizationWindowSeconds: ローリング期間で計算されたインスタンス数の最大値(scaleDown)または最小値(scaleUp)。値を 0 に設定すると、最新の計算値が使用されます。
  • selectPolicy: 複数のポリシーが構成されている場合に適用する結果。
  • Min: 最小の変更
  • Max: 最大の変化
  • Percent: 期間ごとの変更は、構成された合計インスタンスの割合に制限されます。
  • Instances: 期間ごとの変更は、構成されたインスタンス数に制限されます。
  • periodSeconds: ポリシーが適用される期間。

たとえば、デフォルトの構成を使用した完全な仕様は次のようになります。

spec:
  scaleTargetRef:
    name: projects/PROJECT-ID/locations/us-central1/workerpools/kafka-consumer-worker
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 60
          activationThreshold: 0
          tolerance: 0.1
          windowSeconds: 120
    - type: External
      external:
        metric:
          name: consumer_lag
        target:
          type: AverageValue
          averageValue: 1000
          activationThreshold: 0
          tolerance: 0.1
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
        - type: Percent
          value: 50
          periodSeconds: 30
      selectPolicy: Min
    scaleUp:
      stabilizationWindowSeconds: 0
      policies:
        - type: Percent
          value: 100
          periodSeconds: 15
        - type: Instances
          value: 4
          periodSeconds: 15
      selectPolicy: Max