Apache Kafka 到 Kafka 範本

「Apache Kafka 到 Apache Kafka」範本會建立串流管道,從 Apache Kafka 來源擷取位元組形式的資料,然後將位元組寫入 Apache Kafka 接收器。

管道相關規定

  • Apache Kafka 來源主題必須存在。
  • Apache Kafka 來源和接收器代理程式伺服器必須正在執行,且可從 Dataflow 工作站機器連線。
  • 如果您使用 Google Cloud Managed Service for Apache Kafka 做為來源或接收器,必須先建立主題,才能啟動範本。

Kafka 訊息格式

系統會以位元組形式讀取 Apache Kafka 來源訊息,並將位元組寫入 Apache Kafka 接收器。

驗證

Apache Kafka 到 Apache Kafka 範本支援 SASL/PLAINTLS 驗證,可連線至 Kafka 代理程式。

範本參數

必要參數

  • readBootstrapServerAndTopic:Kafka Bootstrap 伺服器和要讀取當中輸入內容的主題。例如:localhost:9092;topic1,topic2
  • kafkaReadAuthenticationMode:要搭配 Kafka 叢集使用的驗證模式。使用 KafkaAuthenticationMethod.NONE 進行無驗證,使用 KafkaAuthenticationMethod.SASL_PLAIN 進行 SASL/PLAIN 使用者名稱和密碼驗證,使用 KafkaAuthenticationMethod.SASL_SCRAM_512 進行 SASL_SCRAM_512 驗證,使用 KafkaAuthenticationMethod.TLS 進行憑證型驗證。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS 僅適用於 Google Cloud Apache Kafka for BigQuery 叢集,可使用應用程式預設憑證進行驗證。
  • writeBootstrapServerAndTopic:要寫入輸出的 Kafka 主題。
  • kafkaWriteAuthenticationMethod:要搭配 Kafka 叢集使用的驗證模式。如果不需要驗證,請使用 NONE;如要使用 SASL/PLAIN 使用者名稱和密碼,請使用 SASL_PLAIN;如要使用 SASL_SCRAM_512 驗證,請使用 SASL_SCRAM_512;如要使用憑證驗證,請使用 TLS。預設值為:APPLICATION_DEFAULT_CREDENTIALS。

選用參數

  • enableCommitOffsets:將已處理訊息的偏移量提交至 Kafka。啟用後,重新啟動管道時,系統會盡量減少訊息處理作業的間隔或重複情形。必須指定用戶群組 ID。預設值為 false。
  • consumerGroupId:這個管道所屬消費者群組的專屬 ID。如果已啟用「將偏移量提交至 Kafka」,則為必要欄位。預設為空白。
  • kafkaReadOffset:在未提交偏移時讀取訊息的起始點。選取「最早」會從頭開始讀取,選取「最晚」則從最新訊息開始。預設值為「latest」。
  • kafkaReadUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配 SASL_PLAIN 驗證使用的 Kafka 使用者名稱。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。
  • kafkaReadPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配 SASL_PLAIN 驗證使用的 Kafka 密碼。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。
  • kafkaReadKeystoreLocation:Java KeyStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有與 Kafka 叢集進行驗證時使用的 TLS 憑證和私密金鑰。例如:gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用來驗證 Kafka 代理程式身分的受信任憑證。
  • kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案中私密金鑰的密碼,以進行 Kafka TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配 SASL_SCRAM 驗證使用的 Kafka 使用者名稱。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配 SASL_SCRAM 驗證使用的 Kafka 密碼。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用於驗證 Kafka 代理程式身分識別的信任憑證。
  • kafkaReadSaslScramTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka SASL_SCRAM 驗證。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含 Kafka 使用者名稱,用於透過 SASL_PLAIN 驗證目的地 Kafka 叢集。例如,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。
  • kafkaWritePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於 SASL_PLAIN 驗證的 Kafka 密碼,可與目的地 Kafka 叢集搭配使用。例如,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。
  • kafkaWriteKeystoreLocation:Java KeyStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有 TLS 憑證和私密金鑰,可用於向目的地 Kafka 叢集進行驗證。例如:gs://<BUCKET>/<KEYSTORE>.jks
  • kafkaWriteTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用來驗證目的地 Kafka 代理程式身分的受信任憑證。
  • kafkaWriteTruststorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以便與目的地 Kafka 叢集進行 TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteKeystorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含存取 Java KeyStore (JKS) 檔案的密碼,用於與目的地 Kafka 叢集進行 TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteKeyPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用來存取 Java KeyStore (JKS) 檔案中私密金鑰的密碼,以便與目的地 Kafka 叢集進行 TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Kafka to Cloud Storage template。
  6. 在提供的參數欄位中輸入參數值。
  7. 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」
  8. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • READ_BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和要讀取的主題
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和要寫入的主題

    啟動伺服器位址和主題的格式取決於叢集類型:

    • Managed Service for Apache Kafka 叢集: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka 叢集: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • READ_BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和要讀取的主題
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和要寫入的主題

    啟動伺服器位址和主題的格式取決於叢集類型:

    • Managed Service for Apache Kafka 叢集: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka 叢集: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

後續步驟