Apache Kafka to Cloud Storage 範本是串流管道,可從 Google Cloud Managed Service for Apache Kafka 擷取文字資料,並將記錄輸出至 Cloud Storage。
您也可以搭配自行管理或外部 Kafka 使用 Apache Kafka 至 BigQuery 範本。
管道相關規定
- 輸出 Cloud Storage 值區必須已存在。
- Apache Kafka 代理程式伺服器必須正在執行,且可從 Dataflow 工作站機器連線。
- Apache Kafka 主題必須存在。
Kafka 訊息格式
這個範本支援以下列格式從 Kafka 讀取訊息:
JSON 格式
如要讀取 JSON 訊息,請將 messageFormat
範本參數設為 "JSON"
。
Avro 二進位編碼
如要讀取二進位 Avro 訊息,請設定下列範本參數:
messageFormat
:"AVRO_BINARY_ENCODING"
。binaryAvroSchemaPath
:Cloud Storage 中 Avro 結構定義檔案的位置。例如:gs://BUCKET_NAME/message-schema.avsc
。
如要進一步瞭解 Avro 二進位格式,請參閱 Apache Avro 說明文件中的「二進位編碼」。
以 Confluent 結構定義儲存庫編碼的 Avro
如要讀取 Confluent Schema Registry 編碼的 Avro 訊息,請設定下列範本參數:
messageFormat
:"AVRO_CONFLUENT_WIRE_FORMAT"
。schemaFormat
:下列其中一個值:"SINGLE_SCHEMA_FILE"
:訊息結構定義是在 Avro 結構定義檔案中定義。 在confluentAvroSchemaPath
參數中指定結構定義檔案的 Cloud Storage 位置。-
"SCHEMA_REGISTRY"
:訊息會使用 Confluent Schema Registry 編碼。 在schemaRegistryConnectionUrl
參數中指定 Confluent Schema Registry 執行個體的網址,並在schemaRegistryAuthenticationMode
參數中指定驗證模式。
如要進一步瞭解這個格式,請參閱 Confluent 說明文件中的「 Wire format」。
輸出檔案格式
輸出檔案格式與輸入 Kafka 訊息的格式相同。舉例來說,如果您選取 JSON 做為 Kafka 訊息格式,系統會將 JSON 檔案寫入輸出 Cloud Storage 值區。
驗證
Apache Kafka 到 Cloud Storage 範本支援對 Kafka 代理程式進行 SASL/PLAIN 驗證。
範本參數
必要參數
- readBootstrapServerAndTopic:要讀取當中輸入內容的 Kafka 主題。
- outputDirectory:寫入輸出檔案的路徑和檔案名稱前置字串,結尾必須為斜線。例如:
gs://your-bucket/your-path/
。 - kafkaReadAuthenticationMode:要搭配 Kafka 叢集使用的驗證模式。使用
KafkaAuthenticationMethod.NONE
進行無驗證,使用KafkaAuthenticationMethod.SASL_PLAIN
進行 SASL/PLAIN 使用者名稱和密碼驗證,使用KafkaAuthenticationMethod.SASL_SCRAM_512
進行 SASL_SCRAM_512 驗證,使用KafkaAuthenticationMethod.TLS
進行憑證型驗證。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
僅適用於 Google Cloud Apache Kafka for BigQuery 叢集,可使用應用程式預設憑證進行驗證。 - messageFormat:要讀取的 Kafka 訊息格式。支援的值為
AVRO_CONFLUENT_WIRE_FORMAT
(Confluent Schema Registry 編碼的 Avro)、AVRO_BINARY_ENCODING
(純二進位 Avro) 和JSON
。預設值為:AVRO_CONFLUENT_WIRE_FORMAT。 - useBigQueryDLQ:如果為 true,系統會將失敗的訊息連同額外的錯誤資訊寫入 BigQuery。預設值為 false。
選用參數
- windowDuration:資料寫入 Cloud Storage 的時段間隔/大小。允許的格式為 Ns (以秒為單位例如 5s)、Nm (以分鐘為單位例如 12m)、Nh (以小時為單位 2h)。例如,
5m
。預設值為 5 分鐘。 - outputFilenamePrefix:加在每個固定時段檔案的前置字串,例如,
output-
。預設值為 output。 - numShards:寫入時產生的輸出資料分割數量上限;分片數量越多,寫入 Cloud Storage 的處理量就越高,但處理輸出 Cloud Storage 檔案時,分片間的資料彙整費用可能會更高。預設值由 Dataflow 決定。
- enableCommitOffsets:將已處理訊息的偏移量提交至 Kafka。啟用後,重新啟動管道時,系統會盡量減少訊息處理作業的間隔或重複情形。必須指定用戶群組 ID。預設值為 false。
- consumerGroupId:這個管道所屬消費者群組的專屬 ID。如果已啟用「將偏移量提交至 Kafka」,則為必要欄位。預設為空白。
- kafkaReadOffset:在未提交偏移時讀取訊息的起始點。選取「最早」會從頭開始讀取,選取「最晚」則從最新訊息開始。預設值為「latest」。
- kafkaReadUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配
SASL_PLAIN
驗證使用的 Kafka 使用者名稱。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。預設為空白。 - kafkaReadPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配
SASL_PLAIN
驗證使用的 Kafka 密碼。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。預設為空白。 - kafkaReadKeystoreLocation:Java KeyStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有與 Kafka 叢集進行驗證時使用的 TLS 憑證和私密金鑰。例如:
gs://your-bucket/keystore.jks
。 - kafkaReadTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用來驗證 Kafka 代理程式身分的受信任憑證。
- kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案中私密金鑰的密碼,以進行 Kafka TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadSaslScramUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配
SASL_SCRAM
驗證使用的 Kafka 使用者名稱。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadSaslScramPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配
SASL_SCRAM
驗證使用的 Kafka 密碼。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadSaslScramTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用於驗證 Kafka 代理程式身分識別的信任憑證。
- kafkaReadSaslScramTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka SASL_SCRAM 驗證。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - schemaFormat:Kafka 結構定義格式。可以提供為
SINGLE_SCHEMA_FILE
或SCHEMA_REGISTRY
。如果指定SINGLE_SCHEMA_FILE
,請對所有訊息使用 Avro 結構定義檔案中提及的結構定義。如果指定SCHEMA_REGISTRY
,訊息可以具有單一或多個結構定義。預設值為:SINGLE_SCHEMA_FILE。 - confluentAvroSchemaPath:Google Cloud Storage 路徑,指向用於解碼主題中所有訊息的單一 Avro 結構定義檔案。預設為空白。
- schemaRegistryConnectionUrl:用於管理訊息解碼 Avro 結構定義的 Confluent Schema Registry 執行個體網址。預設為空白。
- binaryAvroSchemaPath:Avro 結構定義檔案的 Google Cloud Storage 路徑,用於解碼採用二進位編碼的 Avro 訊息。預設為空白。
- schemaRegistryAuthenticationMode:結構定義儲存庫驗證模式。可以是 NONE、TLS 或 OAUTH。預設值為 NONE。
- schemaRegistryTruststoreLocation:SSL 憑證的位置,用於儲存驗證 Schema Registry 的信任存放區。例如:
/your-bucket/truststore.jks
。 - schemaRegistryTruststorePasswordSecretId:Secret Manager 中的 SecretId,用於儲存存取信任儲存區中密碼的密碼。例如:
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
。 - schemaRegistryKeystoreLocation:包含 SSL 憑證和私密金鑰的 KeyStore 位置。例如:
/your-bucket/keystore.jks
。 - schemaRegistryKeystorePasswordSecretId:密碼管理工具中的 SecretId,用於存取金鑰儲存區檔案。例如
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
。 - schemaRegistryKeyPasswordSecretId:存取用戶端私密金鑰所需的密碼 SecretId,該金鑰儲存在金鑰儲存區中。例如
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
。 - schemaRegistryOauthClientId:用於在 OAUTH 模式中驗證 Schema Registry 用戶端的用戶端 ID。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式的必要欄位。
- schemaRegistryOauthClientSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於以 OAUTH 模式驗證 Schema Registry 用戶端的用戶端密鑰。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式必須使用此欄位。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - schemaRegistryOauthScope:用於在 OAUTH 模式中驗證 Schema Registry 用戶端的存取權杖範圍。這個欄位為選填,因為要求可以不傳遞範圍參數。例如:
openid
。 - schemaRegistryOauthTokenEndpointUrl:以 HTTP(S) 為基礎的 OAuth/OIDC 識別資訊提供者網址,用於在 OAUTH 模式中驗證 Schema Registry 用戶端。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式的必要欄位。
- outputDeadletterTable:失敗訊息的完全合格 BigQuery 資料表名稱。訊息無法到達輸出資料表的所有原因 (例如結構定義不相符、JSON 格式錯誤) 會寫入此資料表。此資料表會由範本建立。例如:
your-project-id:your-dataset.your-table-name
。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Kafka to Cloud Storage template。
- 在提供的參數欄位中輸入參數值。
- 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputDirectory=gs://STORAGE_BUCKET_NAME,\ useBigQueryDLQ=false
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱REGION_NAME
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
:Apache Kafka 啟動伺服器位址和主題啟動伺服器位址和主題的格式取決於叢集類型:
- Managed Service for Apache Kafka 叢集:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka 叢集:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka 叢集:
STORAGE_BUCKET_NAME
:輸出內容寫入的 Cloud Storage bucket
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "outputDirectory": "gs://STORAGE_BUCKET_NAME", "useBigQueryDLQ": "false" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex", } }
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱LOCATION
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
:Apache Kafka 啟動伺服器位址和主題啟動伺服器位址和主題的格式取決於叢集類型:
- Managed Service for Apache Kafka 叢集:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka 叢集:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka 叢集:
STORAGE_BUCKET_NAME
:輸出內容寫入的 Cloud Storage bucket
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。