Apache Kafka 到 Cloud Storage 範本

Apache Kafka to Cloud Storage 範本是串流管道,可從 Google Cloud Managed Service for Apache Kafka 擷取文字資料,並將記錄輸出至 Cloud Storage。

您也可以搭配自行管理或外部 Kafka 使用 Apache Kafka 至 BigQuery 範本。

管道相關規定

  • 輸出 Cloud Storage 值區必須已存在。
  • Apache Kafka 代理程式伺服器必須正在執行,且可從 Dataflow 工作站機器連線。
  • Apache Kafka 主題必須存在。

Kafka 訊息格式

這個範本支援以下列格式從 Kafka 讀取訊息:

JSON 格式

如要讀取 JSON 訊息,請將 messageFormat 範本參數設為 "JSON"

Avro 二進位編碼

如要讀取二進位 Avro 訊息,請設定下列範本參數:

  • messageFormat"AVRO_BINARY_ENCODING"
  • binaryAvroSchemaPath:Cloud Storage 中 Avro 結構定義檔案的位置。例如:gs://BUCKET_NAME/message-schema.avsc

如要進一步瞭解 Avro 二進位格式,請參閱 Apache Avro 說明文件中的「二進位編碼」。

以 Confluent 結構定義儲存庫編碼的 Avro

如要讀取 Confluent Schema Registry 編碼的 Avro 訊息,請設定下列範本參數:

  • messageFormat"AVRO_CONFLUENT_WIRE_FORMAT"

  • schemaFormat:下列其中一個值:
    • "SINGLE_SCHEMA_FILE":訊息結構定義是在 Avro 結構定義檔案中定義。 在 confluentAvroSchemaPath 參數中指定結構定義檔案的 Cloud Storage 位置。
    • "SCHEMA_REGISTRY":訊息會使用 Confluent Schema Registry 編碼。 在 schemaRegistryConnectionUrl 參數中指定 Confluent Schema Registry 執行個體的網址,並在 schemaRegistryAuthenticationMode 參數中指定驗證模式。

如要進一步瞭解這個格式,請參閱 Confluent 說明文件中的「 Wire format」。

輸出檔案格式

輸出檔案格式與輸入 Kafka 訊息的格式相同。舉例來說,如果您選取 JSON 做為 Kafka 訊息格式,系統會將 JSON 檔案寫入輸出 Cloud Storage 值區。

驗證

Apache Kafka 到 Cloud Storage 範本支援對 Kafka 代理程式進行 SASL/PLAIN 驗證。

範本參數

必要參數

  • readBootstrapServerAndTopic:要讀取當中輸入內容的 Kafka 主題。
  • outputDirectory:寫入輸出檔案的路徑和檔案名稱前置字串,結尾必須為斜線。例如:gs://your-bucket/your-path/
  • kafkaReadAuthenticationMode:要搭配 Kafka 叢集使用的驗證模式。使用 KafkaAuthenticationMethod.NONE 進行無驗證,使用 KafkaAuthenticationMethod.SASL_PLAIN 進行 SASL/PLAIN 使用者名稱和密碼驗證,使用 KafkaAuthenticationMethod.SASL_SCRAM_512 進行 SASL_SCRAM_512 驗證,使用 KafkaAuthenticationMethod.TLS 進行憑證型驗證。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS 僅適用於 Google Cloud Apache Kafka for BigQuery 叢集,可使用應用程式預設憑證進行驗證。
  • messageFormat:要讀取的 Kafka 訊息格式。支援的值為 AVRO_CONFLUENT_WIRE_FORMAT (Confluent Schema Registry 編碼的 Avro)、AVRO_BINARY_ENCODING (純二進位 Avro) 和 JSON。預設值為:AVRO_CONFLUENT_WIRE_FORMAT。
  • useBigQueryDLQ:如果為 true,系統會將失敗的訊息連同額外的錯誤資訊寫入 BigQuery。預設值為 false。

選用參數

  • windowDuration:資料寫入 Cloud Storage 的時段間隔/大小。允許的格式為 Ns (以秒為單位例如 5s)、Nm (以分鐘為單位例如 12m)、Nh (以小時為單位 2h)。例如,5m。預設值為 5 分鐘。
  • outputFilenamePrefix:加在每個固定時段檔案的前置字串,例如,output-。預設值為 output。
  • numShards:寫入時產生的輸出資料分割數量上限;分片數量越多,寫入 Cloud Storage 的處理量就越高,但處理輸出 Cloud Storage 檔案時,分片間的資料彙整費用可能會更高。預設值由 Dataflow 決定。
  • enableCommitOffsets:將已處理訊息的偏移量提交至 Kafka。啟用後,重新啟動管道時,系統會盡量減少訊息處理作業的間隔或重複情形。必須指定用戶群組 ID。預設值為 false。
  • consumerGroupId:這個管道所屬消費者群組的專屬 ID。如果已啟用「將偏移量提交至 Kafka」,則為必要欄位。預設為空白。
  • kafkaReadOffset:在未提交偏移時讀取訊息的起始點。選取「最早」會從頭開始讀取,選取「最晚」則從最新訊息開始。預設值為「latest」。
  • kafkaReadUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配 SASL_PLAIN 驗證使用的 Kafka 使用者名稱。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。
  • kafkaReadPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配 SASL_PLAIN 驗證使用的 Kafka 密碼。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。
  • kafkaReadKeystoreLocation:Java KeyStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有與 Kafka 叢集進行驗證時使用的 TLS 憑證和私密金鑰。例如:gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用來驗證 Kafka 代理程式身分的受信任憑證。
  • kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案中私密金鑰的密碼,以進行 Kafka TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配 SASL_SCRAM 驗證使用的 Kafka 使用者名稱。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配 SASL_SCRAM 驗證使用的 Kafka 密碼。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用於驗證 Kafka 代理程式身分識別的信任憑證。
  • kafkaReadSaslScramTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka SASL_SCRAM 驗證。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaFormat:Kafka 結構定義格式。可以提供為 SINGLE_SCHEMA_FILESCHEMA_REGISTRY。如果指定 SINGLE_SCHEMA_FILE,請對所有訊息使用 Avro 結構定義檔案中提及的結構定義。如果指定 SCHEMA_REGISTRY,訊息可以具有單一或多個結構定義。預設值為:SINGLE_SCHEMA_FILE。
  • confluentAvroSchemaPath:Google Cloud Storage 路徑,指向用於解碼主題中所有訊息的單一 Avro 結構定義檔案。預設為空白。
  • schemaRegistryConnectionUrl:用於管理訊息解碼 Avro 結構定義的 Confluent Schema Registry 執行個體網址。預設為空白。
  • binaryAvroSchemaPath:Avro 結構定義檔案的 Google Cloud Storage 路徑,用於解碼採用二進位編碼的 Avro 訊息。預設為空白。
  • schemaRegistryAuthenticationMode:結構定義儲存庫驗證模式。可以是 NONE、TLS 或 OAUTH。預設值為 NONE。
  • schemaRegistryTruststoreLocation:SSL 憑證的位置,用於儲存驗證 Schema Registry 的信任存放區。例如:/your-bucket/truststore.jks
  • schemaRegistryTruststorePasswordSecretId:Secret Manager 中的 SecretId,用於儲存存取信任儲存區中密碼的密碼。例如:projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeystoreLocation:包含 SSL 憑證和私密金鑰的 KeyStore 位置。例如:/your-bucket/keystore.jks
  • schemaRegistryKeystorePasswordSecretId:密碼管理工具中的 SecretId,用於存取金鑰儲存區檔案。例如 projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeyPasswordSecretId:存取用戶端私密金鑰所需的密碼 SecretId,該金鑰儲存在金鑰儲存區中。例如 projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryOauthClientId:用於在 OAUTH 模式中驗證 Schema Registry 用戶端的用戶端 ID。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式的必要欄位。
  • schemaRegistryOauthClientSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於以 OAUTH 模式驗證 Schema Registry 用戶端的用戶端密鑰。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式必須使用此欄位。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaRegistryOauthScope:用於在 OAUTH 模式中驗證 Schema Registry 用戶端的存取權杖範圍。這個欄位為選填,因為要求可以不傳遞範圍參數。例如:openid
  • schemaRegistryOauthTokenEndpointUrl:以 HTTP(S) 為基礎的 OAuth/OIDC 識別資訊提供者網址,用於在 OAUTH 模式中驗證 Schema Registry 用戶端。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式的必要欄位。
  • outputDeadletterTable:失敗訊息的完全合格 BigQuery 資料表名稱。訊息無法到達輸出資料表的所有原因 (例如結構定義不相符、JSON 格式錯誤) 會寫入此資料表。此資料表會由範本建立。例如:your-project-id:your-dataset.your-table-name

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Kafka to Cloud Storage template。
  6. 在提供的參數欄位中輸入參數值。
  7. 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」
  8. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \
    --parameters \
readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputDirectory=gs://STORAGE_BUCKET_NAME,\
useBigQueryDLQ=false
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和主題

    啟動伺服器位址和主題的格式取決於叢集類型:

    • Managed Service for Apache Kafka 叢集: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka 叢集: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • STORAGE_BUCKET_NAME:輸出內容寫入的 Cloud Storage bucket

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "messageFormat": "JSON",
          "outputDirectory": "gs://STORAGE_BUCKET_NAME",
          "useBigQueryDLQ": "false"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex",
   }
}
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和主題

    啟動伺服器位址和主題的格式取決於叢集類型:

    • Managed Service for Apache Kafka 叢集: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka 叢集: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • STORAGE_BUCKET_NAME:輸出內容寫入的 Cloud Storage bucket

後續步驟