本教程介绍如何将 Kafka 自动扩缩器配置并部署为 Cloud Run 服务。此自动扩缩器可为 Kafka 使用方工作负载(例如 Cloud Run 工作器池部署)执行扩缩逻辑。Kafka 自动扩缩器会从 Kafka 集群读取指标,并将手动扩缩用于 Cloud Run 工作器池或服务,以便根据 Kafka 使用方延迟时间指标扩缩 Kafka 使用方工作负载。
下图展示了 Kafka 自动扩缩器服务如何从 Kafka 集群读取指标以自动扩缩 Kafka 使用方工作器池。
所需的角色
如需获得部署和运行此服务所需的权限,请让您的管理员为您授予以下 IAM 角色:
- Cloud Run Developer (
roles/run.developer
) - Service Account User (
roles/iam.serviceAccountUser
) - Artifact Registry Reader (
roles/artifactregistry.reader
) - Cloud Scheduler Admin (
roles/cloudscheduler.admin
),用于创建 Cloud Scheduler 作业以触发自动扩缩检查 - Cloud Tasks Queue Admin (
roles/cloudtasks.queueAdmin
),用于创建 Cloud Tasks 队列以进行自动扩缩检查 - Security Admin (
roles/iam.securityAdmin
),用于向服务账号授予权限
准备工作
如需配置和使用 Kafka 自动扩缩器,您需要准备好以下资源。
- Kafka 集群
- 已部署的使用方
Kafka 集群
- Kafka 集群必须在 Compute Engine、Google Kubernetes Engine 或 Managed Service for Apache Kafka 上运行。
- 已配置的 Kafka 主题,其中包含要发布到该主题的事件。
已部署的 Cloud Run 使用方
- Kafka 使用方工作负载必须作为服务或工作器池部署到 Cloud Run。它必须配置为连接到您的 Kafka 集群、主题和使用方群组。如需查看 Kafka 使用方示例,请参阅 Cloud Run Kafka 自动扩缩器使用方示例。
- 您的使用方工作负载必须与 Kafka 集群位于同一 Google Cloud 项目中。
最佳做法
- 使用直接 VPC 将 Kafka 使用方连接到您的 VPC 网络。借助直接 VPC,您可以使用专用 IP 地址连接到 Kafka 集群,还可以将流量保留在 VPC 网络中。
- 为 Kafka 使用方配置活跃健康检查,以检查使用方是否在拉取事件。该健康检查有助于确保健康状况不佳的实例在停止处理事件时自动重启,即使容器未崩溃也是如此。
构建 Kafka 自动扩缩器
您可以使用 Cloud Build 通过 Kafka 自动扩缩器的源代码构建其容器映像。
克隆代码库:
git clone https://github.com/GoogleCloudPlatform/cloud-run-kafka-scaler.git
前往仓库文件夹:
cd cloud-run-kafka-scaler
如需指定输出映像名称,请更新所含 cloudbuild.yaml
文件中的 %ARTIFACT_REGISTRY_IMAGE%
,例如:us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler
。
gcloud builds submit --tag us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler
此命令会构建容器映像并将其推送到 Artifact Registry。记录完整的映像路径 (SCALER_IMAGE_PATH
),因为您稍后需要用到它。
请注意,生成的映像不会在本地运行。其意图是在 Java 基础映像上分层。如需了解详情(包括如何重组容器映像以在本地运行),请参阅配置基础映像自动更新。
定义 Kafka 自动扩缩器配置
您可以使用 Secret 配置 Kafka 自动扩缩器。自动扩缩器会定期刷新其配置,这意味着您可以推送新的 Secret 版本来更改配置,而无需重新部署自动扩缩器。
配置 Kafka 客户端属性
您可以在部署 Kafka 自动扩缩器时,通过将 Secret 装载为卷来配置与 Kafka Admin API 的连接。
创建一个名为 kafka_client_config.txt
的文件,并包含您要添加的任何 Kafka Admin 客户端配置属性。bootstrap.servers
属性是必需属性:
bootstrap.servers=BOOTSTRAP_SERVER_LIST
将 BOOTSTRAP_SERVER_LIST 替换为 Kafka 集群的 HOST:PORT
列表。
配置 Kafka 身份验证
如果您的 Kafka 服务器需要身份验证,请在 kafka_client_config.txt
文件中添加必需的配置属性。例如,如需通过将应用默认凭证与 Google OAuth 搭配使用来连接到 Managed Service for Apache Kafka 集群,此 Secret 应包含以下属性:
bootstrap.servers=BOOTSTRAP_SERVER_LIST
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
将 BOOTSTRAP_SERVER_LIST 替换为 Kafka 集群的 HOST:PORT
列表。
将应用默认凭证与 Managed Service for Apache Kafka 集群搭配使用时,还需要向 Kafka 自动扩缩器服务账号授予 Managed Kafka Client (roles/managedkafka.client
) 角色:
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/managedkafka.client"
替换以下内容:
- SCALER_SERVICE_ACCOUNT:Kafka 自动扩缩器服务账号的名称。
- PROJECT_ID:Kafka 自动扩缩器服务的项目 ID。
如需创建将在部署中装载为卷的 Secret,请使用 kafka_client_config.txt
文件:
gcloud secrets create ADMIN_CLIENT_SECRET_NAME --data-file=kafka_client_config.txt
将 ADMIN_CLIENT_SECRET_NAME 替换为 Kafka 身份验证 Secret 的名称。
配置扩缩
Kafka 自动扩缩器从 /scaler-config/scaling
卷中读取其扩缩配置。此卷的内容应设置为 YAML 格式。我们建议为此配置装载一个 Secret 卷。
创建一个名为 scaling_config.yaml
的文件,其中包含以下配置:
spec: scaleTargetRef: name: projects/PROJECT_ID/locations/REGION/workerpools/CONSUMER_SERVICE_NAME metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: TARGET_CPU_UTILIZATION activationThreshold: CPU_ACTIVATION_THRESHOLD tolerance: CPU_TOLERANCE windowSeconds: CPU_METRIC_WINDOW - type: External external: metric: name: consumer_lag target: type: AverageValue averageValue: LAG_THRESHOLD activationThreshold: LAG_ACTIVATION_THRESHOLD tolerance: LAG_TOLERANCE
替换以下内容:
- PROJECT_ID:要自动扩缩的 Kafka 使用方工作负载的项目 ID。
- REGION:要自动扩缩的 Kafka 使用方工作负载的区域。
- CONSUMER_SERVICE_NAME:要自动扩缩的 Kafka 使用方工作负载的名称。
- TARGET_CPU_UTILIZATION:用于自动扩缩计算的目标 CPU 利用率,例如:
60
。 - LAG_THRESHOLD:触发自动扩缩的
consumer_lag
指标的阈值,例如:1000
。 - (可选)CPU_ACTIVATION_THRESHOLD:CPU 的激活阈值。当所有指标都处于非活跃状态时,目标使用方会缩减至零。默认值为
0
。 - (可选)CPU_TOLERANCE:一个阈值,处于指定范围内时会阻止扩缩更改。以目标 CPU 利用率的百分比表示。默认值为
0.1
。 - (可选)CPU_METRIC_WINDOW:用于计算平均 CPU 利用率的时间段(以秒为单位)。默认值为
120
。 - (可选)LAG_ACTIVATION_THRESHOLD:
consumer_lag
指标的激活阈值。当所有指标都处于非活跃状态时,目标使用方会缩减至零。默认值为0
。 - (可选)LAG_TOLERANCE:一个阈值,处于指定范围内时会阻止扩缩更改。以目标使用方延迟时间的百分比表示。默认值为
0.1
。
(可选)您可以使用 behavior:
块配置高级扩缩属性。此块支持许多与 Kubernetes HPA 扩缩政策相同的属性。
如果您未指定 behavior
块,则系统会使用以下默认配置:
behavior: scaleDown: stabilizationWindowSeconds: 300 policies: - type: Percent value: 50 periodSeconds: 30 selectPolicy: Min scaleUp: stabilizationWindowSeconds: 0 policies: - type: Percent value: 100 periodSeconds: 15 - type: Instances value: 4 periodSeconds: 15 selectPolicy: Max
如需创建将在部署中装载的 Secret 卷,请将配置复制到名为 scaling_config.yaml
的文件中,然后运行以下命令:
gcloud secrets create SCALING_CONFIG_SECRET_NAME --data-file=scaling_config.yaml
将 SCALING_CONFIG_SECRET_NAME 替换为扩缩 Secret 的名称。
部署 Kafka 自动扩缩器
满足前提条件后,您就可以部署 Kafka 自动扩缩器服务及其配套基础设施了。我们提供了一个 Terraform 模块和一个 Shell 脚本来简化此过程。
gcloud
本部分将演示手动部署自动扩缩器所需的每个 gcloud 命令。在大多数情况下,我们建议改用 Shell 脚本或 Terraform 模块。
创建服务账号
服务账号要求取决于您配置的自动扩缩检查间隔。您可以将 Kafka 自动扩缩器配置为按灵活的间隔执行自动扩缩检查:
- 1 分钟或更长时间:Cloud Scheduler 会按所选的间隔通过 POST 请求触发自动扩缩检查。
不到 1 分钟:Cloud Scheduler 会根据配置的频率,每分钟触发多次 Cloud Tasks 创建操作。
1 分钟或更长时间
Kafka 自动扩缩器服务账号
为 Kafka 自动扩缩器创建服务账号:
gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT
将 SCALER_SERVICE_ACCOUNT 替换为 Kafka 自动扩缩器服务账号的名称。
Kafka 自动扩缩器需要以下权限才能更新 Kafka 使用方实例的数量:
iam.serviceaccounts.actAs
(针对 Kafka 使用方服务账号)。roles/artifactregistry.reader
(针对包含 Kafka 使用方映像的仓库)。run.workerpools.get
和run.workerpools.update
。这些权限包含在 Cloud Run Admin 角色 (roles/run.admin
) 中。roles/secretmanager.secretAccessor
(针对扩缩和 Kafka 身份验证 Secret)。roles/monitoring.viewer
(针对 Kafka 使用方项目)。需要此角色才能读取 CPU 利用率指标。roles/monitoring.metricWriter
(针对 Kafka 使用方项目)。此角色是可选的,但它允许自动扩缩器发出自定义指标,以提高可观测性。
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountUser"
gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader" \
--location=REPO_REGION
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/run.admin"
gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.viewer" \
--condition=None
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.metricWriter" \
--condition=None
替换以下内容:
- PROJECT_ID:Kafka 自动扩缩器服务所在的项目 ID。
- CONSUMER_SERVICE_ACCOUNT_EMAIL:Kafka 使用方的服务账号邮箱。例如
example@PROJECT-ID.iam.gserviceaccount.com
。 - SCALER_SERVICE_ACCOUNT:Kafka 自动扩缩器的服务账号。
- ADMIN_CLIENT_SECRET_NAME:Kafka 身份验证 Secret 的名称。
- SCALING_CONFIG_SECRET_NAME:扩缩 Secret 的名称。
- CONSUMER_IMAGE_REPO:包含 Kafka 使用方容器映像的仓库的 ID 或完全限定标识符。
- REPO_REGION:使用方映像仓库的位置。
不到 1 分钟
设置 Cloud Tasks
Cloud Scheduler 只能按 1 分钟或更长时间的间隔触发。对于不到 1 分钟的间隔,请使用 Cloud Tasks 触发 Kafka 自动扩缩器。设置 Cloud Tasks 需要执行以下操作:
- 为自动扩缩检查任务创建 Cloud Tasks 队列。
- 创建 Cloud Tasks 用于调用 Kafka 自动扩缩器的服务账号,并为其授予 Cloud Run Invoker 角色。
gcloud tasks queues create CLOUD_TASKS_QUEUE_NAME \
--location=REGION
gcloud iam service-accounts create TASKS_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
--member="serviceAccount:TASKS_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/run.invoker"
替换以下内容:
- CLOUD_TASKS_QUEUE_NAME:用于触发自动扩缩检查的已配置 Cloud Tasks 队列。
- TASKS_SERVICE_ACCOUNT:Cloud Tasks 应该用于触发自动扩缩检查的服务账号。
- SCALER_SERVICE_NAME:Kafka 自动扩缩器服务的名称。
- PROJECT_ID:Kafka 自动扩缩器服务的项目 ID。
- REGION:Kafka 自动扩缩器服务的位置。
设置 Kafka 自动扩缩器服务账号
为 Kafka 自动扩缩器创建服务账号:
gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT
将 SCALER_SERVICE_ACCOUNT 替换为 Kafka 自动扩缩器服务账号的名称。
如需更新 Kafka 使用方实例的数量并为自动扩缩检查创建任务,Kafka 自动扩缩器需要以下权限:
iam.serviceaccounts.actAs
(针对 Kafka 使用方服务账号)。roles/artifactregistry.reader
(针对包含 Kafka 使用方映像的仓库)run.workerpools.get
和run.workerpools.update
。这些权限包含在 Cloud Run Admin 角色 (roles/run.admin
) 中。roles/secretmanager.secretAccessor
(针对用于扩缩和 Kafka 身份验证的两个 Secret)。roles/monitoring.viewer
(针对 Kafka 使用方项目)。需要此角色才能读取 CPU 利用率指标。roles/monitoring.metricWriter
(针对 Kafka 使用方项目)。此角色是可选的,但它允许自动扩缩器发出自定义指标,以提高可观测性。- Cloud Tasks Enqueuer 角色 (
roles/cloudtasks.enqueuer
)。
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountUser"
gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader" \
--location=REPO_REGION
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/run.admin"
gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.viewer" \
--condition=None
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.metricWriter" \
--condition=None
gcloud tasks queues add-iam-policy-binding CLOUD_TASKS_QUEUE_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/cloudtasks.enqueuer" \
--location=REGION
替换以下内容:
- PROJECT_ID:Kafka 自动扩缩器服务所在的项目 ID。
- CONSUMER_SERVICE_ACCOUNT_EMAIL:Kafka 使用方的服务账号邮箱。例如
example@PROJECT_ID.iam.gserviceaccount.com
。 - SCALER_SERVICE_ACCOUNT:Kafka 自动扩缩器的服务账号。
- CONSUMER_IMAGE_REPO:包含 Kafka 使用方容器映像的仓库的 ID 或完全限定标识符。
- ADMIN_CLIENT_SECRET_NAME:Kafka 身份验证 Secret 的名称。
- SCALING_CONFIG_SECRET_NAME:扩缩 Secret 的名称。
- REPO_REGION:使用方映像仓库的位置。
- CLOUD_TASKS_QUEUE_NAME:用于触发自动扩缩检查的已配置 Cloud Tasks 队列。
- REGION:Kafka 自动扩缩器服务的位置。
配置环境变量
1 分钟或更长时间
Kafka 自动扩缩器使用环境变量来指定 Kafka 使用方和目标工作负载的其他方面。为确保安全,我们建议您将敏感信息配置为 Secret。
创建一个名为 scaler_env_vars.yaml
的 YAML 文件,其中包含以下变量:
KAFKA_TOPIC_ID: KAFKA_TOPIC_ID CONSUMER_GROUP_ID: CONSUMER_GROUP_ID CYCLE_SECONDS: CYCLE_SECONDS OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS
替换以下内容:
- KAFKA_TOPIC_ID:Kafka 使用方订阅的主题 ID。
- CONSUMER_GROUP_ID:目标 Kafka 使用方所使用的使用方群组 ID。这些值必须一致,否则自动扩缩将失败。
- CYCLE_SECONDS:自动扩缩器周期(以秒为单位)。
- OUTPUT_SCALER_METRICS:用于启用指标的设置。将值设置为
true
可启用自定义指标输出,否则设置为false
。
不到 1 分钟
Kafka 自动扩缩器使用环境变量来指定 Kafka 使用方和目标工作负载的其他方面。为确保安全,我们建议您将敏感信息配置为 Secret。
创建一个名为 scaler_env_vars.yaml
的 YAML 文件,其中包含以下变量:
KAFKA_TOPIC_ID: KAFKA_TOPIC_ID CONSUMER_GROUP_ID: CONSUMER_GROUP_ID CYCLE_SECONDS: CYCLE_SECONDS OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS FULLY_QUALIFIED_CLOUD_TASKS_QUEUE_NAME: CLOUD_TASKS_QUEUE_NAME INVOKER_SERVICE_ACCOUNT_EMAIL: TASKS_SERVICE_ACCOUNT_EMAIL
替换以下内容:
- KAFKA_TOPIC_ID:Kafka 使用方订阅的主题 ID。
- CONSUMER_GROUP_ID:目标 Kafka 使用方所使用的使用方群组 ID。这些值必须一致,否则自动扩缩将失败。
- CYCLE_SECONDS:自动扩缩器周期(以秒为单位)。
- OUTPUT_SCALER_METRICS:用于启用指标的设置。将值设置为
true
可启用自定义指标输出,否则设置为false
。 - CLOUD_TASKS_QUEUE_NAME:用于触发自动扩缩检查的 Cloud Tasks 队列的完全限定名称。其格式如下:
projects/$PROJECT_ID/locations/$REGION/queues/$CLOUD_TASKS_QUEUE_NAME
。 - TASKS_SERVICE_ACCOUNT_EMAIL:Cloud Tasks 应该用于触发自动扩缩检查的服务账号。例如
example@PROJECT_ID.iam.gserviceaccount.com
。
使用提供的映像部署 Kafka 自动扩缩器,并通过 scaler_env_vars.yaml
文件和 Secret 卷装载连接到 Kafka VPC:
gcloud run deploy SCALER_SERVICE_NAME \
--image=SCALER_IMAGE_URI \
--env-vars-file=scaler_env_vars.yaml \
--service-account=SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
--no-allow-unauthenticated \
--network=KAFKA_VPC_NETWORK \
--subnet=KAFKA_VPC_SUBNET \
--update-secrets=/kafka-config/kafka-client-properties=ADMIN_CLIENT_SECRET_NAME:latest \
--update-secrets=/scaler-config/scaling=SCALING_CONFIG_SECRET_NAME:latest
--labels=created-by=kafka-autoscaler
替换以下内容:
- SCALER_IMAGE_URI:Kafka 自动扩缩器映像的 URI。
- SCALER_SERVICE_NAME:Kafka 自动扩缩器服务的名称。
- SCALER_SERVICE_ACCOUNT:Kafka 自动扩缩器服务账号的名称。
- PROJECT_ID:Kafka 自动扩缩器服务的项目 ID。
- KAFKA_VPC_NETWORK:连接到 Kafka 集群的 VPC 网络。
- KAFKA_VPC_SUBNET:连接到 Kafka 集群的 VPC 子网。
- ADMIN_CLIENT_SECRET_NAME:Kafka 身份验证 Secret 的名称。
- SCALING_CONFIG_SECRET_NAME:扩缩 Secret 的名称。
设置定期自动扩缩检查
在本部分中,您将使用 Cloud Scheduler 触发定期自动扩缩检查:
- 1 分钟或更长时间:将 Cloud Scheduler 配置为按所选间隔触发
- 不到 1 分钟:将 Cloud Scheduler 配置为每分钟触发一次
创建调用方服务账号
如需使 Cloud Scheduler 能够调用 Kafka 自动扩缩器,您必须创建一个对 Kafka 自动扩缩器服务具有调用方角色 (roles/run.invoker
) 的服务账号:
gcloud iam service-accounts create SCALER_INVOKER_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
--member="serviceAccount:SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/run.invoker"
替换以下内容:
- SCALER_SERVICE_NAME:Kafka 自动扩缩器服务的名称。
- SCALER_INVOKER_SERVICE_ACCOUNT:调用方服务账号的名称。
- PROJECT_ID:Kafka 自动扩缩器服务的项目 ID。
创建 Cloud Scheduler 作业
1 分钟或更长时间
创建具有所选自动扩缩检查间隔的 Cloud Scheduler 作业:
gcloud scheduler jobs create http kafka-scaling-check \
--location=REGION \
--schedule="CRON_SCHEDULE" \
--time-zone="TIMEZONE" \
--uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
--oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
--http-method=POST
替换以下内容:
- SCALER_SERVICE_NAME:Kafka 自动扩缩器服务的名称。
- SCALER_INVOKER_SERVICE_ACCOUNT:调用方服务账号的名称。
- PROJECT_ID:项目 ID 或 Kafka 自动扩缩器服务。
- PROJECT_NUMBER:Kafka 自动扩缩器服务的项目编号。
- REGION:Kafka 自动扩缩器服务的位置。
- TIMEZONE:时区,例如:
America/Los_Angeles
。 - CRON_SCHEDULE:以 Crontab 格式表示的所选时间表。例如,对于每分钟为
"* * * * *"
。
不到 1 分钟
创建每分钟执行一次的 Cloud Scheduler 作业:
gcloud scheduler jobs create http kafka-scaling-check \
--location=REGION \
--schedule="* * * * *" \
--time-zone="TIMEZONE" \
--uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
--oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
--http-method=POST
替换以下内容:
- SCALER_SERVICE_NAME:Kafka 自动扩缩器服务的名称。
- SCALER_INVOKER_SERVICE_ACCOUNT:调用方服务账号的名称。
- PROJECT_ID:Kafka 自动扩缩器服务的项目 ID。
- PROJECT_NUMBER:Kafka 自动扩缩器服务的项目编号。
- REGION:Kafka 自动扩缩器服务的位置。
- TIMEZONE:时区,例如:
America/Los_Angeles
。
terraform
terraform/
目录包含一个可重复使用的 Terraform 模块,您可以使用该模块来预配 Kafka 自动扩缩器及其关联资源。
此模块可自动创建以下内容:
- Kafka 自动扩缩器 Cloud Run 服务
- 支持服务账号和 IAM 绑定
- Cloud Tasks 队列
- Cloud Scheduler 作业
如需详细说明、用法示例以及所有输入/输出变量的说明,请参阅 terraform readme
。
您需要向 Terraform 模块提供必需的变量,包括前提条件中的详细信息,例如项目 ID、区域、使用方 SA 邮箱、Secret 名称、扩缩器映像路径和主题 ID。
shell
自动扩缩器随附一个 setup_kafka_scaler.sh
脚本,用于自动创建和配置所有必需的资源。
设置环境变量
在运行该脚本之前,请确保您已设置所有必需的环境变量:
# Details for already-deployed Kafka consumer
export PROJECT_ID=PROJECT_ID
export REGION=REGION
export CONSUMER_SERVICE_NAME=DEPLOYED_KAFKA_CONSUMER
export CONSUMER_SA_EMAIL=KAFKA_CONSUMER_ACCOUNT_EMAIL # For example, NAME@PROJECT_ID.iam.gserviceaccount.com
export TOPIC_ID=KAFKA_TOPIC_ID
export CONSUMER_GROUP_ID=KAFKA_CONSUMER_GROUP_ID
export NETWORK=VPC_NETWORK
export SUBNET=VPC_SUBNET
# Details for new items to be created during this setup
export CLOUD_TASKS_QUEUE_NAME=CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS
export TASKS_SERVICE_ACCOUNT=TASKS_SERVICE_ACCOUNT_NAME
export SCALER_SERVICE_NAME=KAFKA_AUTOSCALER_SERVICE_NAME
export SCALER_IMAGE_PATH=KAFKA_AUTOSCALER_IMAGE_URI
export SCALER_CONFIG_SECRET=KAFKA_AUTOSCALER_CONFIG_SECRET_NAME
export CYCLE_SECONDS=SCALER_CHECK_FREQUENCY # For example, 15; this value should be at least 5 seconds.
export OUTPUT_SCALER_METRICS=false # If you want scaling metrics to outputted to Cloud Monitoring set this to true and ensure your scaler service account has permission to write metrics (for example, via roles/monitoring.metricWriter).
替换以下内容:
- PROJECT_ID:Kafka 自动扩缩器服务所在的项目 ID。
- REGION:Kafka 自动扩缩器服务的位置。
- DEPLOYED_KAFKA_CONSUMER:Kafka 使用方名称。
- KAFKA_CONSUMER_ACCOUNT_EMAIL:Kafka 使用方的服务账号邮箱。
- KAFKA_TOPIC_ID:Kafka 使用方订阅的主题 ID。
- KAFKA_CONSUMER_GROUP_ID:目标 Kafka 使用方所使用的使用方群组 ID。这些值必须一致,否则自动扩缩将失败。
- VPC_NETWORK:连接到 Kafka 集群的 VPC 网络。
- VPC_SUBNET:连接到 Kafka 集群的 VPC 子网。
- CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS:用于触发自动扩缩检查的已配置 Cloud Tasks 队列。
- TASKS_SERVICE_ACCOUNT_NAME:Cloud Tasks 应该用于触发自动扩缩检查的服务账号。
- KAFKA_AUTOSCALER_SERVICE_NAME:Kafka 自动扩缩器服务的名称。
- KAFKA_AUTOSCALER_IMAGE_URI:Kafka 自动扩缩器映像的 URI。
- KAFKA_AUTOSCALER_CONFIG_SECRET_NAME:扩缩 Secret 的名称。
- SCALER_CHECK_FREQUENCY:自动扩缩器周期(以秒为单位)。
运行设置脚本
执行提供的 setup_kafka_scaler.sh
脚本:
./setup_kafka_scaler.sh
该脚本会执行以下操作:
- 创建用于触发自动扩缩检查的 Cloud Tasks 队列。
- 创建 Kafka 自动扩缩器服务账号,并授予必需的权限。
- 配置并部署 Kafka 自动扩缩器。
- 创建定期触发自动扩缩检查的 Cloud Scheduler 作业。
当 setup_kafka_scaler.sh
脚本运行时,它会输出已配置的环境变量。请先验证环境变量是否正确,然后再继续。
授予其他权限
如需更改 Kafka 使用方的实例数量,Kafka 自动扩缩器服务账号必须对已部署的容器映像具有查看权限。例如,如果使用方映像是从 Artifact Registry 部署的,请运行以下命令:
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SCALER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader" # Or appropriate role for your registry
验证 Kafka 自动扩缩是否正常运行
向 Kafka 自动扩缩器服务网址 (SCALER_SERVICE_NAME-
PROJECT_NUMBER.
REGION.run.app
) 发出的请求会触发该服务的扩缩操作。
您可以向 Kafka 自动扩缩器服务发送 POST
请求,以触发自动扩缩计算:
curl -X POST -H "Authorization: Bearer $(gcloud auth print-identity-token)" https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app
替换以下内容:
- SCALER_SERVICE_NAME:Kafka 自动扩缩器服务的名称。
- PROJECT_NUMBER:Kafka 自动扩缩器服务的项目编号。
- REGION:Kafka 自动扩缩器服务的位置。
POST
请求会触发自动扩缩计算,将结果输出到日志记录,并根据建议更改实例数量。
Kafka 自动扩缩器服务的日志应包含类似于 [SCALING] Recommended instances X
的消息。
如果启用了 OUTPUT_SCALER_METRICS
标志,您还可以在 custom.googleapis.com/cloud-run-kafkascaler
下找到缩放器 Cloud Monitoring 指标。
高级扩缩配置
spec: metrics: behavior: scaleDown: stabilizationWindowSeconds: [INT] policies: - type: [Percent, Instances] value: [INT] periodSeconds: [INT] selectPolicy: [Min, Max] scaleUp: stabilizationWindowSeconds: [INT] policies: - type: [Percent, Instances] value: [INT] periodSeconds: [INT] selectPolicy: [Min, Max]
以下列表介绍了上述部分元素:
scaleDown
:减少实例数量(缩容)时的行为。scaleUp
:增加实例数量(扩容)时的行为。stabilizationWindowSeconds
:滚动式时间段内计算出的实例数量的最高值 (scaleDown
) 或最低值 (scaleUp
)。将值设置为0
表示使用最近一次计算的值。selectPolicy
:配置了多项政策时进行强制执行的结果。Min
:最小更改Max
:最大更改Percent
:每个周期的更改数量限制为所配置的实例总数百分比。Instances
:每个周期的更改数量限制为所配置的实例数量。periodSeconds
:强制执行政策的时长。
例如,使用默认配置的完整规范如下所示:
spec: scaleTargetRef: name: projects/PROJECT-ID/locations/us-central1/workerpools/kafka-consumer-worker metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 60 activationThreshold: 0 tolerance: 0.1 windowSeconds: 120 - type: External external: metric: name: consumer_lag target: type: AverageValue averageValue: 1000 activationThreshold: 0 tolerance: 0.1 behavior: scaleDown: stabilizationWindowSeconds: 300 policies: - type: Percent value: 50 periodSeconds: 30 selectPolicy: Min scaleUp: stabilizationWindowSeconds: 0 policies: - type: Percent value: 100 periodSeconds: 15 - type: Instances value: 4 periodSeconds: 15 selectPolicy: Max