「Apache Kafka 到 Apache Kafka」範本會建立串流管道,從 Apache Kafka 來源擷取位元組形式的資料,然後將位元組寫入 Apache Kafka 接收器。
管道相關規定
- Apache Kafka 來源主題必須存在。
- Apache Kafka 來源和接收器代理程式伺服器必須正在執行,且可從 Dataflow 工作站機器連線。
- 如果您使用 Google Cloud Managed Service for Apache Kafka 做為來源或接收器,必須先建立主題,才能啟動範本。
Kafka 訊息格式
系統會以位元組形式讀取 Apache Kafka 來源訊息,並將位元組寫入 Apache Kafka 接收器。
驗證
Apache Kafka 到 Apache Kafka 範本支援 SASL/PLAIN 和 TLS 驗證,可連線至 Kafka 代理程式。
範本參數
必要參數
- readBootstrapServerAndTopic:Kafka Bootstrap 伺服器和要讀取當中輸入內容的主題。例如:
localhost:9092;topic1,topic2
。 - kafkaReadAuthenticationMode:要搭配 Kafka 叢集使用的驗證模式。使用
KafkaAuthenticationMethod.NONE
進行無驗證,使用KafkaAuthenticationMethod.SASL_PLAIN
進行 SASL/PLAIN 使用者名稱和密碼驗證,使用KafkaAuthenticationMethod.SASL_SCRAM_512
進行 SASL_SCRAM_512 驗證,使用KafkaAuthenticationMethod.TLS
進行憑證型驗證。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
僅適用於 Google Cloud Apache Kafka for BigQuery 叢集,可使用應用程式預設憑證進行驗證。 - writeBootstrapServerAndTopic:要寫入輸出的 Kafka 主題。
- kafkaWriteAuthenticationMethod:要搭配 Kafka 叢集使用的驗證模式。如果不需要驗證,請使用 NONE;如要使用 SASL/PLAIN 使用者名稱和密碼,請使用 SASL_PLAIN;如要使用 SASL_SCRAM_512 驗證,請使用 SASL_SCRAM_512;如要使用憑證驗證,請使用 TLS。預設值為:APPLICATION_DEFAULT_CREDENTIALS。
選用參數
- enableCommitOffsets:將已處理訊息的偏移量提交至 Kafka。啟用後,重新啟動管道時,系統會盡量減少訊息處理作業的間隔或重複情形。必須指定用戶群組 ID。預設值為 false。
- consumerGroupId:這個管道所屬消費者群組的專屬 ID。如果已啟用「將偏移量提交至 Kafka」,則為必要欄位。預設為空白。
- kafkaReadOffset:在未提交偏移時讀取訊息的起始點。選取「最早」會從頭開始讀取,選取「最晚」則從最新訊息開始。預設值為「latest」。
- kafkaReadUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配
SASL_PLAIN
驗證使用的 Kafka 使用者名稱。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。預設為空白。 - kafkaReadPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配
SASL_PLAIN
驗證使用的 Kafka 密碼。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。預設為空白。 - kafkaReadKeystoreLocation:Java KeyStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有與 Kafka 叢集進行驗證時使用的 TLS 憑證和私密金鑰。例如:
gs://your-bucket/keystore.jks
。 - kafkaReadTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用來驗證 Kafka 代理程式身分的受信任憑證。
- kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案中私密金鑰的密碼,以進行 Kafka TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadSaslScramUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配
SASL_SCRAM
驗證使用的 Kafka 使用者名稱。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadSaslScramPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配
SASL_SCRAM
驗證使用的 Kafka 密碼。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadSaslScramTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用於驗證 Kafka 代理程式身分識別的信任憑證。
- kafkaReadSaslScramTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka SASL_SCRAM 驗證。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaWriteUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含 Kafka 使用者名稱,用於透過 SASL_PLAIN 驗證目的地 Kafka 叢集。例如,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。預設為空白。 - kafkaWritePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於 SASL_PLAIN 驗證的 Kafka 密碼,可與目的地 Kafka 叢集搭配使用。例如,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。預設為空白。 - kafkaWriteKeystoreLocation:Java KeyStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有 TLS 憑證和私密金鑰,可用於向目的地 Kafka 叢集進行驗證。例如:
gs://<BUCKET>/<KEYSTORE>.jks
。 - kafkaWriteTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用來驗證目的地 Kafka 代理程式身分的受信任憑證。
- kafkaWriteTruststorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以便與目的地 Kafka 叢集進行 TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaWriteKeystorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含存取 Java KeyStore (JKS) 檔案的密碼,用於與目的地 Kafka 叢集進行 TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaWriteKeyPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用來存取 Java KeyStore (JKS) 檔案中私密金鑰的密碼,以便與目的地 Kafka 叢集進行 TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Kafka to Cloud Storage template。
- 在提供的參數欄位中輸入參數值。
- 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱REGION_NAME
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
:Apache Kafka 啟動伺服器位址和要讀取的主題WRITE_BOOTSTRAP_SERVER_AND_TOPIC
:Apache Kafka 啟動伺服器位址和要寫入的主題啟動伺服器位址和主題的格式取決於叢集類型:
- Managed Service for Apache Kafka 叢集:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka 叢集:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka 叢集:
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱LOCATION
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
:Apache Kafka 啟動伺服器位址和要讀取的主題WRITE_BOOTSTRAP_SERVER_AND_TOPIC
:Apache Kafka 啟動伺服器位址和要寫入的主題啟動伺服器位址和主題的格式取決於叢集類型:
- Managed Service for Apache Kafka 叢集:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka 叢集:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka 叢集:
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。