Escalonar automaticamente as cargas de trabalho de consumidores do Kafka

Neste tutorial, mostramos como configurar e implantar um escalonador automático do Kafka como um serviço do Cloud Run. Esse escalonador automático realiza a lógica de escalonamento para uma carga de trabalho de consumidor do Kafka, como uma implantação de pool de workers do Cloud Run. O escalonador automático do Kafka lê métricas do seu cluster do Kafka e usa o escalonamento manual para um pool de trabalhadores ou serviço do Cloud Run para escalonar uma carga de trabalho de consumidor do Kafka com base na métrica de atraso do consumidor do Kafka.

O diagrama a seguir mostra como um serviço de escalonador automático do Kafka lê métricas de um cluster do Kafka para escalonar automaticamente um pool de workers consumidores do Kafka.

Um serviço de escalonamento automático do Kafka extrai métricas do Kafka e faz o escalonamento automático de um consumidor do Kafka.

Funções exigidas

Para receber as permissões necessárias para implantar e executar esse serviço, peça ao administrador para conceder a você os seguintes papéis do IAM:

Antes de começar

Para configurar e usar o escalonador automático do Kafka, você precisa dos seguintes recursos:

  • Cluster Kafka
  • Consumidor implantado

Cluster Kafka

Consumidor do Cloud Run implantado

  • Uma carga de trabalho de consumidor do Kafka precisa ser implantada no Cloud Run como um serviço ou um pool de workers. Ele precisa ser configurado para se conectar ao cluster, tópico e grupo de consumidores do Kafka. Para um exemplo de consumidor do Kafka, consulte Exemplo de consumidor do escalonador automático do Kafka no Cloud Run.
  • Sua carga de trabalho do consumidor precisa estar no mesmo projeto Google Cloud do cluster do Kafka.

Práticas recomendadas

  • Conecte seus consumidores do Kafka à rede VPC usando a VPC direta. Com a VPC direta, é possível se conectar ao cluster do Kafka usando endereços IP particulares e manter o tráfego na rede VPC.
  • Configure uma verificação de integridade de atividade para seus consumidores do Kafka que verifique se o consumidor está extraindo eventos. A verificação de integridade ajuda a garantir que as instâncias não íntegras sejam reiniciadas automaticamente se pararem de processar eventos, mesmo que o contêiner não falhe.

Criar o escalonador automático do Kafka

Use o Cloud Build para criar uma imagem de contêiner do escalonador automático do Kafka com base no código-fonte.

  1. Clone o repositório:

    git clone https://github.com/GoogleCloudPlatform/cloud-run-kafka-scaler.git
    
  2. Navegue até a pasta do repositório:

    cd cloud-run-kafka-scaler
    

Para especificar o nome da imagem de saída, atualize %ARTIFACT_REGISTRY_IMAGE% no arquivo cloudbuild.yaml incluído. Por exemplo: us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler.

gcloud builds submit --tag us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler

Esse comando cria a imagem do contêiner e a envia para o Artifact Registry. Registre o caminho completo da imagem (SCALER_IMAGE_PATH), porque você vai precisar dele mais tarde.

A imagem resultante não será executada localmente. Ele foi projetado para ser colocado sobre uma imagem de base Java. Para mais informações, incluindo como remontar a imagem do contêiner para execução local, consulte Configurar atualizações automáticas da imagem de base.

Definir a configuração do escalonador automático do Kafka

É possível configurar o escalonador automático do Kafka usando secrets. O escalonador automático atualiza a configuração periodicamente. Isso significa que você pode enviar novas versões de chaves secretas para mudar a configuração sem precisar fazer uma nova implantação do escalonador automático.

Configurar propriedades do cliente Kafka

É possível configurar a conexão com a API Admin do Kafka montando um secret como um volume ao implantar o escalonador automático do Kafka.

Crie um arquivo chamado kafka_client_config.txt e inclua as propriedades de configuração do cliente administrador do Kafka que você quer adicionar. A propriedade bootstrap.servers é obrigatória:

bootstrap.servers=BOOTSTRAP_SERVER_LIST

Substitua BOOTSTRAP_SERVER_LIST pela lista HOST:PORT do cluster do Kafka.

Configurar a autenticação do Kafka

Se o servidor Kafka exigir autenticação, inclua as propriedades de configuração necessárias no arquivo kafka_client_config.txt. Por exemplo, para se conectar a um cluster do Managed Service para Apache Kafka usando credenciais padrão do aplicativo com o Google OAuth, esse secret precisa incluir as seguintes propriedades:

bootstrap.servers=BOOTSTRAP_SERVER_LIST
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Substitua BOOTSTRAP_SERVER_LIST pela lista HOST:PORT do cluster do Kafka.

Usar credenciais padrão do aplicativo com um cluster do Serviço gerenciado para Apache Kafka também exige conceder o papel Cliente do Kafka gerenciado (roles/managedkafka.client) à conta de serviço do escalonador automático do Kafka:

gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/managedkafka.client"

Substitua:

  • SCALER_SERVICE_ACCOUNT: o nome da conta de serviço do escalonador automático do Kafka.
  • PROJECT_ID: o ID do projeto para o serviço de escalonamento automático do Kafka.

Para criar o secret, que será montado como um volume na implantação, use o arquivo kafka_client_config.txt:

gcloud secrets create ADMIN_CLIENT_SECRET_NAME --data-file=kafka_client_config.txt

Substitua ADMIN_CLIENT_SECRET_NAME pelo nome do secret de autenticação do Kafka.

Configurar o escalonamento

O escalonador automático do Kafka lê a configuração de escalonamento do volume /scaler-config/scaling. O conteúdo desse volume precisa ser formatado como YAML. Recomendamos montar um volume secreto para essa configuração.

Crie um arquivo chamado scaling_config.yaml com a seguinte configuração:

spec:
  scaleTargetRef:
    name: projects/PROJECT_ID/locations/REGION/workerpools/CONSUMER_SERVICE_NAME
 metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: TARGET_CPU_UTILIZATION
        activationThreshold: CPU_ACTIVATION_THRESHOLD
        tolerance: CPU_TOLERANCE
        windowSeconds: CPU_METRIC_WINDOW
  - type: External
    external:
      metric:
        name: consumer_lag
      target:
        type: AverageValue
        averageValue: LAG_THRESHOLD
        activationThreshold: LAG_ACTIVATION_THRESHOLD
        tolerance: LAG_TOLERANCE

Substitua:

  • PROJECT_ID: o ID do projeto da carga de trabalho do consumidor do Kafka que será escalonada automaticamente.
  • REGION: a região da carga de trabalho do consumidor do Kafka que será escalonada automaticamente.
  • CONSUMER_SERVICE_NAME: o nome da carga de trabalho do consumidor do Kafka a ser escalonada automaticamente.
  • TARGET_CPU_UTILIZATION: a meta de uso da CPU para cálculos de escalonamento automático. Por exemplo: 60.
  • LAG_THRESHOLD: o limite da métrica consumer_lag para acionar o escalonamento automático. Por exemplo: 1000.
  • (Opcional) CPU_ACTIVATION_THRESHOLD: o limite de ativação da CPU. Quando todas as métricas estão inativas, o consumidor de destino é dimensionado para zero. O padrão é 0.
  • (Opcional) CPU_TOLERANCE: um limite que impede mudanças de escalonamento se estiver dentro do intervalo especificado. Expressa como uma porcentagem do uso pretendido de CPU. O padrão é 0.1.
  • (Opcional) CPU_METRIC_WINDOW: um período de tempo, em segundos, em que o uso médio da CPU é calculado. O padrão é 120.
  • (Opcional) LAG_ACTIVATION_THRESHOLD: o limite de ativação da métrica consumer_lag. Quando todas as métricas estão inativas, o consumidor de destino é dimensionado para zero. O padrão é 0.
  • (Opcional) LAG_TOLERANCE: um limite que impede mudanças de escalonamento se estiver dentro do intervalo especificado. Expressa como uma porcentagem do atraso do consumidor desejado. O padrão é 0.1.

Se quiser, configure propriedades de escalonamento avançadas usando um bloco behavior:. Esse bloco é compatível com muitas das mesmas propriedades das políticas de escalonamento do HPA do Kubernetes.

Se você não especificar um bloco behavior, a seguinte configuração padrão será usada:

behavior:
  scaleDown:
    stabilizationWindowSeconds: 300
    policies:
    - type: Percent
      value: 50
      periodSeconds: 30
    selectPolicy: Min
  scaleUp:
    stabilizationWindowSeconds: 0
    policies:
    - type: Percent
      value: 100
      periodSeconds: 15
    - type: Instances
      value: 4
      periodSeconds: 15
    selectPolicy: Max

Para criar o volume de secret, que será montado em deployment, copie a configuração em um arquivo chamado scaling_config.yaml e execute o seguinte:

gcloud secrets create SCALING_CONFIG_SECRET_NAME --data-file=scaling_config.yaml

Substitua SCALING_CONFIG_SECRET_NAME pelo nome do secret de escalonamento automático.

Implantar o escalonador automático do Kafka

Depois de concluir os pré-requisitos, você poderá implantar o serviço de escalonamento automático do Kafka e a infraestrutura de suporte dele. Um módulo do Terraform e um script de shell são fornecidos para simplificar esse processo.

gcloud

Esta seção explica cada comando gcloud necessário para implantar manualmente o autoescalador. Na maioria dos casos, recomendamos usar o script shell ou o módulo do Terraform.

Criar uma conta de serviço

Os requisitos da conta de serviço dependem do intervalo de verificação do escalonamento automático configurado. É possível configurar o autoescalador do Kafka para realizar verificações de escalonamento automático em intervalos flexíveis:

  • Um minuto ou mais: o Cloud Scheduler aciona a verificação do escalonamento automático com uma solicitação POST no intervalo selecionado.
  • Menos de um minuto: o Cloud Scheduler aciona a criação de várias tarefas do Cloud a cada minuto, com base na frequência configurada.

Um ou mais minutos

Conta de serviço do escalonador automático do Kafka

Crie uma conta de serviço para o escalonador automático do Kafka:

gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT

Substitua SCALER_SERVICE_ACCOUNT pelo nome da conta de serviço do escalonador automático do Kafka.

O autoescalador do Kafka precisa das seguintes permissões para atualizar o número de instâncias de consumidor do Kafka:

  • iam.serviceaccounts.actAs para a conta de serviço do consumidor do Kafka.
  • roles/artifactregistry.reader para o repositório que contém a imagem do consumidor do Kafka.
  • run.workerpools.get e run.workerpools.update. Essas permissões estão incluídas no papel de Administrador do Cloud Run (roles/run.admin).
  • roles/secretmanager.secretAccessor para os secrets de escalonamento e autenticação do Kafka.
  • roles/monitoring.viewer para o projeto do consumidor do Kafka. Essa função é necessária para ler as métricas de utilização da CPU.
  • roles/monitoring.metricWriter para o projeto do consumidor do Kafka. Essa função é opcional, mas permite que o escalonador automático emita métricas personalizadas para melhorar a capacidade de observação.
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/iam.serviceAccountUser"

gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/artifactregistry.reader" \
    --location=REPO_REGION

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/run.admin"

gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.viewer" \
    --condition=None

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.metricWriter" \
    --condition=None

Substitua:

  • PROJECT_ID: o ID do projeto em que o serviço de escalonamento automático do Kafka está localizado.
  • CONSUMER_SERVICE_ACCOUNT_EMAIL: o e-mail da conta de serviço para o consumidor do Kafka. Por exemplo, example@PROJECT-ID.iam.gserviceaccount.com.
  • SCALER_SERVICE_ACCOUNT: a conta de serviço do escalonador automático do Kafka.
  • ADMIN_CLIENT_SECRET_NAME: o nome do secret de autenticação do Kafka.
  • SCALING_CONFIG_SECRET_NAME: o nome do secret de escalonamento.
  • CONSUMER_IMAGE_REPO: o ID ou identificador totalmente qualificado do repositório com a imagem do contêiner para o consumidor do Kafka.
  • REPO_REGION: o local do repositório de imagens do consumidor.

Menos de um minuto

Configurar o Cloud Tasks

O Cloud Scheduler só pode ser acionado em intervalos de um minuto ou mais. Para intervalos menores que um minuto, use o Cloud Tasks para acionar o escalonador automático do Kafka. Para configurar o Cloud Tasks, é necessário:

  • Criar a fila do Cloud Tasks para as tarefas de verificação do escalonamento automático.
  • Criar a conta de serviço que o Cloud Tasks usa para invocar o escalonador automático do Kafka com o papel de Invocador do Cloud Run.
gcloud tasks queues create CLOUD_TASKS_QUEUE_NAME \
--location=REGION
gcloud iam service-accounts create TASKS_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
    --member="serviceAccount:TASKS_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/run.invoker"

Substitua:

  • CLOUD_TASKS_QUEUE_NAME: a fila configurada do Cloud Tasks para acionar verificações de escalonamento automático.
  • TASKS_SERVICE_ACCOUNT: a conta de serviço que o Cloud Tasks deve usar para acionar verificações de escalonamento automático.
  • SCALER_SERVICE_NAME: o nome do seu serviço de escalonador automático do Kafka.
  • PROJECT_ID: o ID do projeto para o serviço de escalonamento automático do Kafka.
  • REGION: o local do serviço de escalonador automático do Kafka.

Configurar a conta de serviço do autoescalador do Kafka

Crie uma conta de serviço para o escalonador automático do Kafka:

gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT

Substitua SCALER_SERVICE_ACCOUNT pelo nome da conta de serviço do escalonador automático do Kafka.

Para atualizar o número de instâncias de consumidor do Kafka e criar tarefas para verificações de escalonamento automático, o escalonador automático do Kafka precisa das seguintes permissões:

  • iam.serviceaccounts.actAs para a conta de serviço do consumidor do Kafka.
  • roles/artifactregistry.reader para o repositório que contém a imagem do consumidor do Kafka
  • run.workerpools.get e run.workerpools.update. Essas permissões estão incluídas no papel de Administrador do Cloud Run (roles/run.admin).
  • roles/secretmanager.secretAccessor para os dois secrets de escalonamento e autenticação do Kafka.
  • roles/monitoring.viewer para o projeto do consumidor do Kafka. Essa função é necessária para ler as métricas de utilização da CPU.
  • roles/monitoring.metricWriter para o projeto do consumidor do Kafka. Essa função é opcional, mas permite que o escalonador automático emita métricas personalizadas para melhorar a capacidade de observação.
  • Função Enfileirador do Cloud Tasks (roles/cloudtasks.enqueuer).
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/iam.serviceAccountUser"

gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/artifactregistry.reader" \
    --location=REPO_REGION

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/run.admin"

gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.viewer" \
    --condition=None

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.metricWriter" \
    --condition=None

gcloud tasks queues add-iam-policy-binding CLOUD_TASKS_QUEUE_NAME \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/cloudtasks.enqueuer" \
    --location=REGION

Substitua:

  • PROJECT_ID: o ID do projeto em que o serviço de escalonamento automático do Kafka está localizado.
  • CONSUMER_SERVICE_ACCOUNT_EMAIL: o e-mail da conta de serviço para o consumidor do Kafka. Por exemplo, example@PROJECT_ID.iam.gserviceaccount.com.
  • SCALER_SERVICE_ACCOUNT: a conta de serviço do escalonador automático do Kafka.
  • CONSUMER_IMAGE_REPO: o ID ou o identificador totalmente qualificado do repositório com a imagem do contêiner para o consumidor do Kafka.
  • ADMIN_CLIENT_SECRET_NAME: o nome do secret de autenticação do Kafka.
  • SCALING_CONFIG_SECRET_NAME: o nome do secret de escalonamento.
  • REPO_REGION: o local do repositório de imagens do consumidor.
  • CLOUD_TASKS_QUEUE_NAME: a fila configurada do Cloud Tasks para acionar verificações de escalonamento automático.
  • REGION: o local do serviço de escalonador automático do Kafka.

Configure as variáveis de ambiente

Um ou mais minutos

O autoescalador do Kafka usa variáveis de ambiente para especificar o consumidor do Kafka e outros aspectos da carga de trabalho de destino. Por segurança, recomendamos que você configure informações sensíveis como secrets.

Crie um arquivo YAML chamado scaler_env_vars.yaml com as seguintes variáveis:

KAFKA_TOPIC_ID: KAFKA_TOPIC_ID
CONSUMER_GROUP_ID: CONSUMER_GROUP_ID
CYCLE_SECONDS: CYCLE_SECONDS
OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS

Substitua:

  • KAFKA_TOPIC_ID: o ID do tópico a que os consumidores do Kafka se inscrevem.
  • CONSUMER_GROUP_ID: o ID do grupo de consumidores usado pelo consumidor do Kafka de destino. Esses valores precisam corresponder, caso contrário, o escalonamento automático vai falhar.
  • CYCLE_SECONDS: o período do ciclo do escalonador automático, em segundos.
  • OUTPUT_SCALER_METRICS: a configuração para ativar as métricas. Defina o valor como true para ativar a saída de métricas personalizadas ou como false caso contrário.

Menos de um minuto

O autoescalador do Kafka usa variáveis de ambiente para especificar o consumidor do Kafka e outros aspectos da carga de trabalho de destino. Por segurança, recomendamos que você configure informações sensíveis como secrets.

Crie um arquivo YAML chamado scaler_env_vars.yaml com as seguintes variáveis:

KAFKA_TOPIC_ID: KAFKA_TOPIC_ID
CONSUMER_GROUP_ID: CONSUMER_GROUP_ID
CYCLE_SECONDS: CYCLE_SECONDS
OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS
FULLY_QUALIFIED_CLOUD_TASKS_QUEUE_NAME: CLOUD_TASKS_QUEUE_NAME
INVOKER_SERVICE_ACCOUNT_EMAIL: TASKS_SERVICE_ACCOUNT_EMAIL

Substitua:

  • KAFKA_TOPIC_ID: o ID do tópico a que os consumidores do Kafka se inscrevem.
  • CONSUMER_GROUP_ID: o ID do grupo de consumidores usado pelo consumidor do Kafka de destino. Esses valores precisam corresponder, caso contrário, o escalonamento automático vai falhar.
  • CYCLE_SECONDS: o período do ciclo do escalonador automático, em segundos.
  • OUTPUT_SCALER_METRICS: a configuração para ativar as métricas. Defina o valor como true para ativar a saída de métricas personalizadas ou false caso contrário.
  • CLOUD_TASKS_QUEUE_NAME: o nome totalmente qualificado da fila do Cloud Tasks para acionar verificações de escalonamento automático. Ele tem o seguinte formato: projects/$PROJECT_ID/locations/$REGION/queues/$CLOUD_TASKS_QUEUE_NAME.
  • TASKS_SERVICE_ACCOUNT_EMAIL: a conta de serviço que o Cloud Tasks deve usar para acionar verificações de escalonamento automático. Por exemplo, example@PROJECT_ID.iam.gserviceaccount.com.

Implante o escalonador automático do Kafka usando a imagem fornecida e conecte-se à VPC do Kafka com o arquivo scaler_env_vars.yaml e as montagens de volume secreto:

gcloud run deploy SCALER_SERVICE_NAME \
    --image=SCALER_IMAGE_URI \
    --env-vars-file=scaler_env_vars.yaml \
    --service-account=SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
    --no-allow-unauthenticated \
    --network=KAFKA_VPC_NETWORK \
    --subnet=KAFKA_VPC_SUBNET \
    --update-secrets=/kafka-config/kafka-client-properties=ADMIN_CLIENT_SECRET_NAME:latest \
    --update-secrets=/scaler-config/scaling=SCALING_CONFIG_SECRET_NAME:latest
    --labels=created-by=kafka-autoscaler

Substitua:

  • SCALER_IMAGE_URI: o URI da imagem do escalonador automático do Kafka.
  • SCALER_SERVICE_NAME: o nome do seu serviço de escalonador automático do Kafka.
  • SCALER_SERVICE_ACCOUNT: o nome da conta de serviço do escalonador automático do Kafka.
  • PROJECT_ID: o ID do projeto para o serviço de escalonamento automático do Kafka.
  • KAFKA_VPC_NETWORK: a rede VPC conectada ao cluster do Kafka.
  • KAFKA_VPC_SUBNET: a sub-rede da VPC conectada ao cluster do Kafka.
  • ADMIN_CLIENT_SECRET_NAME: o nome do secret de autenticação do Kafka.
  • SCALING_CONFIG_SECRET_NAME: o nome do secret de escalonamento.

Configurar verificações periódicas de escalonamento automático

Nesta seção, você usará o Cloud Scheduler para acionar verificações periódicas de escalonamento automático:

  • Um minuto ou mais: configure o Cloud Scheduler para acionar no intervalo selecionado.
  • Menos de um minuto: configure o Cloud Scheduler para acionar a cada minuto
Criar conta de serviço do invocador

Para permitir que o Cloud Scheduler chame o escalonador automático do Kafka, crie uma conta de serviço com a função de invocador (roles/run.invoker) no serviço de escalonador automático do Kafka:

gcloud iam service-accounts create SCALER_INVOKER_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
  --member="serviceAccount:SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/run.invoker"

Substitua:

  • SCALER_SERVICE_NAME: o nome do seu serviço de escalonador automático do Kafka.
  • SCALER_INVOKER_SERVICE_ACCOUNT: o nome da conta de serviço do invocador.
  • PROJECT_ID: o ID do projeto para o serviço de escalonamento automático do Kafka.
Criar o job do Cloud Scheduler

Um ou mais minutos

Crie um job do Cloud Scheduler com o intervalo de verificação de escalonamento automático selecionado:

gcloud scheduler jobs create http kafka-scaling-check \
    --location=REGION \
    --schedule="CRON_SCHEDULE" \
    --time-zone="TIMEZONE" \
    --uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
    --oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
    --http-method=POST

Substitua:

  • SCALER_SERVICE_NAME: o nome do seu serviço de escalonador automático do Kafka.
  • SCALER_INVOKER_SERVICE_ACCOUNT: o nome da conta de serviço do invocador.
  • PROJECT_ID: o ID do projeto ou o serviço de escalonamento automático do Kafka.
  • PROJECT_NUMBER: o número do projeto do serviço de escalonador automático do Kafka.
  • REGION: o local do serviço de escalonador automático do Kafka.
  • TIMEZONE: o fuso horário, por exemplo, America/Los_Angeles.
  • CRON_SCHEDULE: a programação selecionada no formato crontab. Por exemplo, a cada minuto: "* * * * *".

Menos de um minuto

Crie um job do Cloud Scheduler que seja executado a cada minuto:

gcloud scheduler jobs create http kafka-scaling-check \
    --location=REGION \
    --schedule="* * * * *" \
    --time-zone="TIMEZONE" \
    --uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
    --oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
    --http-method=POST

Substitua:

  • SCALER_SERVICE_NAME: o nome do seu serviço de escalonador automático do Kafka.
  • SCALER_INVOKER_SERVICE_ACCOUNT: o nome da conta de serviço do invocador.
  • PROJECT_ID: o ID do projeto do serviço de escalonamento automático do Kafka.
  • PROJECT_NUMBER: o número do projeto do serviço de escalonador automático do Kafka.
  • REGION: o local do serviço de escalonador automático do Kafka.
  • TIMEZONE: o fuso horário, por exemplo, America/Los_Angeles.

terraform

O diretório terraform/ contém um módulo Terraform reutilizável que pode ser usado para provisionar o escalonador automático do Kafka e os recursos associados.

Este módulo automatiza a criação de:

  • O serviço do Cloud Run de escalonamento automático do Kafka
  • Suporte a contas de serviço e vinculações do IAM
  • Fila do Cloud Tasks
  • Job do Cloud Scheduler

Para instruções detalhadas, exemplos de uso e descrições de todas as variáveis de entrada/saída, consulte terraform readme.

Você precisa fornecer as variáveis necessárias ao módulo do Terraform, incluindo detalhes dos pré-requisitos, como ID do projeto, região, e-mail da SA do consumidor, nomes secretos, caminho da imagem do escalonador e ID do tópico.

shell

Um script setup_kafka_scaler.sh é fornecido com o escalonador automático para criar e configurar automaticamente todos os recursos necessários.

Defina as variáveis de ambiente

Antes de executar o script, verifique se você definiu todas as variáveis de ambiente necessárias:

# Details for already-deployed Kafka consumer
export PROJECT_ID=PROJECT_ID
export REGION=REGION
export CONSUMER_SERVICE_NAME=DEPLOYED_KAFKA_CONSUMER
export CONSUMER_SA_EMAIL=KAFKA_CONSUMER_ACCOUNT_EMAIL # For example, NAME@PROJECT_ID.iam.gserviceaccount.com
export TOPIC_ID=KAFKA_TOPIC_ID
export CONSUMER_GROUP_ID=KAFKA_CONSUMER_GROUP_ID
export NETWORK=VPC_NETWORK
export SUBNET=VPC_SUBNET

# Details for new items to be created during this setup
export CLOUD_TASKS_QUEUE_NAME=CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS
export TASKS_SERVICE_ACCOUNT=TASKS_SERVICE_ACCOUNT_NAME

export SCALER_SERVICE_NAME=KAFKA_AUTOSCALER_SERVICE_NAME
export SCALER_IMAGE_PATH=KAFKA_AUTOSCALER_IMAGE_URI
export SCALER_CONFIG_SECRET=KAFKA_AUTOSCALER_CONFIG_SECRET_NAME

export CYCLE_SECONDS=SCALER_CHECK_FREQUENCY # For example, 15; this value should be at least 5 seconds.

export OUTPUT_SCALER_METRICS=false # If you want scaling metrics to outputted to Cloud Monitoring set this to true and ensure your scaler service account has permission to write metrics (for example, via roles/monitoring.metricWriter).

Substitua:

  • PROJECT_ID: o ID do projeto em que o serviço de escalonamento automático do Kafka está localizado.
  • REGION: o local do serviço de escalonador automático do Kafka.
  • DEPLOYED_KAFKA_CONSUMER: o nome do consumidor do Kafka.
  • KAFKA_CONSUMER_ACCOUNT_EMAIL: o e-mail da conta de serviço para o consumidor do Kafka.
  • KAFKA_TOPIC_ID: o ID do tópico a que os consumidores do Kafka se inscrevem.
  • KAFKA_CONSUMER_GROUP_ID: o ID do grupo de consumidores usado pelo consumidor do Kafka de destino. Esses valores precisam corresponder, caso contrário, o escalonamento automático vai falhar.
  • VPC_NETWORK: a rede VPC conectada ao cluster do Kafka.
  • VPC_SUBNET: a sub-rede da VPC conectada ao cluster do Kafka.
  • CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS: a fila configurada do Cloud Tasks para acionar verificações de escalonamento automático.
  • TASKS_SERVICE_ACCOUNT_NAME: a conta de serviço que o Cloud Tasks deve usar para acionar verificações de escalonamento automático.
  • KAFKA_AUTOSCALER_SERVICE_NAME: o nome do seu serviço de escalonador automático do Kafka.
  • KAFKA_AUTOSCALER_IMAGE_URI: o URI da imagem do escalonador automático do Kafka.
  • KAFKA_AUTOSCALER_CONFIG_SECRET_NAME: o nome do secret de escalonamento.
  • SCALER_CHECK_FREQUENCY: o período do ciclo do escalonador automático, em segundos.

Executar o script de configuração

Execute o script setup_kafka_scaler.sh fornecido:

./setup_kafka_scaler.sh

O script executa estas ações:

  • Cria a fila do Cloud Tasks usada para acionar verificações de escalonamento automático.
  • Cria a conta de serviço do escalonador automático do Kafka e concede as permissões necessárias.
  • Configura e implanta o escalonador automático do Kafka.
  • Cria o job do Cloud Scheduler que aciona periodicamente as verificações de escalonamento automático.

Quando o script setup_kafka_scaler.sh é executado, ele gera as variáveis de ambiente configuradas. Verifique se as variáveis de ambiente estão corretas antes de continuar.

Conceder outras permissões

Para mudar a contagem de instâncias do consumidor do Kafka, a conta de serviço do escalonador automático do Kafka precisa ter permissão de visualização na imagem do contêiner implantado. Por exemplo, se a imagem do consumidor foi implantada do Artifact Registry, execute o seguinte comando:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$SCALER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/artifactregistry.reader" # Or appropriate role for your registry

Verificar se o escalonamento automático do Kafka está funcionando

O escalonamento automático do serviço Kafka é acionado com uma solicitação para o URL do serviço (SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app).

É possível enviar uma solicitação POST ao serviço de escalonamento automático do Kafka para acionar o cálculo do escalonamento automático:

curl -X POST -H "Authorization: Bearer $(gcloud auth print-identity-token)" https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app

Substitua:

  • SCALER_SERVICE_NAME: o nome do seu serviço de escalonador automático do Kafka.
  • PROJECT_NUMBER: o número do projeto do serviço de escalonador automático do Kafka.
  • REGION: o local do serviço de escalonador automático do Kafka.

As solicitações POST acionam o cálculo do escalonamento automático, a saída para o registro em registros, e mudam a contagem de instâncias com base na recomendação.

Os registros do seu serviço de escalonamento automático do Kafka precisam incluir mensagens como [SCALING] Recommended instances X.

Se a flag OUTPUT_SCALER_METRICS estiver ativada, você também poderá encontrar métricas de escalonador do Cloud Monitoring em custom.googleapis.com/cloud-run-kafkascaler.

Configuração avançada de escalonamento

spec:
  metrics:
  behavior:
    scaleDown:
      stabilizationWindowSeconds: [INT]
      policies:
      - type: [Percent, Instances]
        value: [INT]
        periodSeconds: [INT]
      selectPolicy: [Min, Max]
    scaleUp:
      stabilizationWindowSeconds: [INT]
      policies:
      - type: [Percent, Instances]
        value: [INT]
        periodSeconds: [INT]
      selectPolicy: [Min, Max]

A lista a seguir descreve alguns dos elementos anteriores:

  • scaleDown: o comportamento ao reduzir a contagem de instâncias (redução de escalonamento).
  • scaleUp: o comportamento ao aumentar a contagem de instâncias (escalonamento vertical).
  • stabilizationWindowSeconds: o número mais alto (scaleDown) ou mais baixo (scaleUp) de instâncias calculado em um período contínuo. Definir o valor como 0 significa que o valor calculado mais recente será usado.
  • selectPolicy: o resultado a ser aplicado quando várias políticas são configuradas.
  • Min: a menor mudança
  • Max: a maior mudança
  • Percent: as mudanças por período são limitadas à porcentagem configurada do total de instâncias.
  • Instances: as mudanças por período são limitadas ao número configurado de instâncias.
  • periodSeconds: o período em que a política é aplicada.

Por exemplo, a especificação completa, usando a configuração padrão, é assim:

spec:
  scaleTargetRef:
    name: projects/PROJECT-ID/locations/us-central1/workerpools/kafka-consumer-worker
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 60
          activationThreshold: 0
          tolerance: 0.1
          windowSeconds: 120
    - type: External
      external:
        metric:
          name: consumer_lag
        target:
          type: AverageValue
          averageValue: 1000
          activationThreshold: 0
          tolerance: 0.1
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
        - type: Percent
          value: 50
          periodSeconds: 30
      selectPolicy: Min
    scaleUp:
      stabilizationWindowSeconds: 0
      policies:
        - type: Percent
          value: 100
          periodSeconds: 15
        - type: Instances
          value: 4
          periodSeconds: 15
      selectPolicy: Max