Apache Kafka 到 BigQuery 範本

Apache Kafka 到 BigQuery 範本是串流管道,可從 Google Cloud Managed Service for Apache Kafka 叢集擷取文字資料,然後將產生的記錄輸出至 BigQuery 表格。 將資料插入輸出資料表時發生的任何錯誤,都會插入 BigQuery 中的另一個錯誤資料表。

您也可以搭配自行管理或外部 Kafka 使用 Apache Kafka 至 BigQuery 範本。

管道相關規定

  • Apache Kafka 代理程式伺服器必須正在執行,且可從 Dataflow 工作站機器連線。
  • Apache Kafka 主題必須存在。
  • 您必須啟用 Dataflow、BigQuery 和 Cloud Storage API。如果需要驗證,您也必須啟用 Secret Manager API。
  • 建立具備適當 Kafka 輸入主題結構定義的 BigQuery 資料集和資料表。如果您在同一個主題中使用多個結構定義,並想寫入多個資料表,則不需要在設定管道前建立資料表。
  • 如果範本的無效信件 (未處理的訊息) 佇列已啟用,請建立沒有無效信件佇列結構定義的空白資料表。

Kafka 訊息格式

這個範本支援以下列格式從 Kafka 讀取訊息:

JSON 格式

如要讀取 JSON 訊息,請將 messageFormat 範本參數設為 "JSON"

Avro 二進位編碼

如要讀取二進位 Avro 訊息,請設定下列範本參數:

  • messageFormat"AVRO_BINARY_ENCODING"
  • binaryAvroSchemaPath:Cloud Storage 中 Avro 結構定義檔案的位置。例如:gs://BUCKET_NAME/message-schema.avsc

如要進一步瞭解 Avro 二進位格式,請參閱 Apache Avro 說明文件中的「二進位編碼」。

以 Confluent 結構定義儲存庫編碼的 Avro

如要讀取 Confluent Schema Registry 編碼的 Avro 訊息,請設定下列範本參數:

  • messageFormat"AVRO_CONFLUENT_WIRE_FORMAT"

  • schemaFormat:下列其中一個值:
    • "SINGLE_SCHEMA_FILE":訊息結構定義是在 Avro 結構定義檔案中定義。 在 confluentAvroSchemaPath 參數中指定結構定義檔案的 Cloud Storage 位置。
    • "SCHEMA_REGISTRY":訊息會使用 Confluent Schema Registry 編碼。 在 schemaRegistryConnectionUrl 參數中指定 Confluent Schema Registry 執行個體的網址,並在 schemaRegistryAuthenticationMode 參數中指定驗證模式。

如要進一步瞭解這個格式,請參閱 Confluent 說明文件中的「 Wire format」。

驗證

Apache Kafka 到 BigQuery 範本支援對 Kafka 代理程式進行 SASL/PLAIN 驗證。

範本參數

必要參數

  • readBootstrapServerAndTopic:要讀取當中輸入內容的 Kafka 主題。
  • writeMode:將記錄寫入一個或多個資料表 (視結構定義而定)。DYNAMIC_TABLE_NAMES 模式僅支援 AVRO_CONFLUENT_WIRE_FORMAT 來源訊息格式和 SCHEMA_REGISTRY 結構定義來源。系統會根據每則訊息的 Avro 結構定義名稱自動產生目標資料表名稱,可能是單一結構定義 (建立單一資料表),也可能是多個結構定義 (建立多個資料表)。SINGLE_TABLE_NAME 模式會寫入使用者指定的單一表格 (單一結構定義)。預設值為 SINGLE_TABLE_NAME
  • kafkaReadAuthenticationMode:要搭配 Kafka 叢集使用的驗證模式。使用 KafkaAuthenticationMethod.NONE 進行無驗證,使用 KafkaAuthenticationMethod.SASL_PLAIN 進行 SASL/PLAIN 使用者名稱和密碼驗證,使用 KafkaAuthenticationMethod.SASL_SCRAM_512 進行 SASL_SCRAM_512 驗證,使用 KafkaAuthenticationMethod.TLS 進行憑證型驗證。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS 僅適用於 Google Cloud Apache Kafka for BigQuery 叢集,可使用應用程式預設憑證進行驗證。
  • messageFormat:要讀取的 Kafka 訊息格式。支援的值為 AVRO_CONFLUENT_WIRE_FORMAT (Confluent Schema Registry 編碼的 Avro)、AVRO_BINARY_ENCODING (純二進位 Avro) 和 JSON。預設值為:AVRO_CONFLUENT_WIRE_FORMAT。
  • useBigQueryDLQ:如果為 true,系統會將失敗的訊息連同額外的錯誤資訊寫入 BigQuery。預設值為 false。

選用參數

  • outputTableSpec:用於寫入輸出內容的 BigQuery 資料表位置。名稱的格式應為 <project>:<dataset>.<table_name>。資料表的結構定義必須與輸入物件相符。
  • persistKafkaKey:如果為 true,管道會將 Kafka 訊息鍵保留在 BigQuery 資料表中,並以 _key 欄位 (類型為 BYTES) 儲存。預設值為 false (系統會忽略金鑰)。
  • outputProject:資料集所在的 BigQuery 輸出專案。系統會在資料集中動態建立資料表。預設為空白。
  • outputDataset:用於寫入輸出內容的 BigQuery 輸出資料集。系統會在資料集中動態建立資料表。如果事先建立資料表,資料表名稱應遵循指定的命名慣例。名稱應為 bqTableNamePrefix + Avro Schema FullName,每個字詞之間以連字號分隔 -。預設為空白。
  • bqTableNamePrefix:建立 BigQuery 輸出資料表時使用的命名前置字串。只有在使用結構定義登錄時才適用。預設為空白。
  • createDisposition:BigQuery CreateDisposition。例如:CREATE_IF_NEEDEDCREATE_NEVER。預設值為 CREATE_IF_NEEDED。
  • writeDisposition:BigQuery WriteDisposition。例如:WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。預設值為 WRITE_APPEND。
  • useAutoSharding:如果為 true,管道會在寫入 BigQuery 時使用自動分片。預設值為 true
  • numStorageWriteApiStreams:指定寫入串流的數量,必須設定這個參數。預設值為 0
  • storageWriteApiTriggeringFrequencySec:指定觸發頻率 (以秒為單位),必須設定這項參數。預設值為 5 秒。
  • useStorageWriteApiAtLeastOnce:只有在啟用「使用 BigQuery Storage Write API」時,這個參數才會生效。如果啟用,系統會對 Storage Write API 使用至少一次語意,否則會使用單次語意。預設值為 false。
  • enableCommitOffsets:將已處理訊息的偏移量提交至 Kafka。啟用後,重新啟動管道時,系統會盡量減少訊息處理作業的間隔或重複情形。必須指定用戶群組 ID。預設值為 false。
  • consumerGroupId:這個管道所屬消費者群組的專屬 ID。如果已啟用「將偏移量提交至 Kafka」,則為必要欄位。預設為空白。
  • kafkaReadOffset:在未提交偏移時讀取訊息的起始點。選取「最早」會從頭開始讀取,選取「最晚」則從最新訊息開始。預設值為「latest」。
  • kafkaReadUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配 SASL_PLAIN 驗證使用的 Kafka 使用者名稱。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。
  • kafkaReadPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配 SASL_PLAIN 驗證使用的 Kafka 密碼。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。
  • kafkaReadKeystoreLocation:Java KeyStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有與 Kafka 叢集進行驗證時使用的 TLS 憑證和私密金鑰。例如:gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用來驗證 Kafka 代理程式身分的受信任憑證。
  • kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案中私密金鑰的密碼,以進行 Kafka TLS 驗證。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramUsernameSecretId:Google Cloud Secret Manager 密碼 ID,內含要搭配 SASL_SCRAM 驗證使用的 Kafka 使用者名稱。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配 SASL_SCRAM 驗證使用的 Kafka 密碼。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用於驗證 Kafka 代理程式身分識別的信任憑證。
  • kafkaReadSaslScramTruststorePasswordSecretId:Google Cloud Secret Manager 密碼 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka SASL_SCRAM 驗證。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaFormat:Kafka 結構定義格式。可以提供為 SINGLE_SCHEMA_FILESCHEMA_REGISTRY。如果指定 SINGLE_SCHEMA_FILE,請對所有訊息使用 Avro 結構定義檔案中提及的結構定義。如果指定 SCHEMA_REGISTRY,訊息可以具有單一或多個結構定義。預設值為:SINGLE_SCHEMA_FILE。
  • confluentAvroSchemaPath:Google Cloud Storage 路徑,指向用於解碼主題中所有訊息的單一 Avro 結構定義檔案。預設為空白。
  • schemaRegistryConnectionUrl:用於管理訊息解碼 Avro 結構定義的 Confluent Schema Registry 執行個體網址。預設為空白。
  • binaryAvroSchemaPath:Avro 結構定義檔案的 Google Cloud Storage 路徑,用於解碼採用二進位編碼的 Avro 訊息。預設為空白。
  • schemaRegistryAuthenticationMode:結構定義儲存庫驗證模式。可以是 NONE、TLS 或 OAUTH。預設值為 NONE。
  • schemaRegistryTruststoreLocation:SSL 憑證的位置,用於儲存驗證 Schema Registry 的信任存放區。例如:/your-bucket/truststore.jks
  • schemaRegistryTruststorePasswordSecretId:Secret Manager 中的 SecretId,用於儲存存取信任儲存區中密碼的密碼。例如:projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeystoreLocation:包含 SSL 憑證和私密金鑰的 KeyStore 位置。例如:/your-bucket/keystore.jks
  • schemaRegistryKeystorePasswordSecretId:密碼管理工具中的 SecretId,用於存取金鑰儲存區檔案。例如 projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeyPasswordSecretId:存取用戶端私密金鑰所需的密碼 SecretId,該金鑰儲存在金鑰儲存區中。例如 projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryOauthClientId:用於在 OAUTH 模式中驗證 Schema Registry 用戶端的用戶端 ID。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式的必要欄位。
  • schemaRegistryOauthClientSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於以 OAUTH 模式驗證 Schema Registry 用戶端的用戶端密鑰。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式必須使用此欄位。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaRegistryOauthScope:用於在 OAUTH 模式中驗證 Schema Registry 用戶端的存取權杖範圍。這個欄位為選填,因為要求可以不傳遞範圍參數。例如:openid
  • schemaRegistryOauthTokenEndpointUrl:以 HTTP(S) 為基礎的 OAuth/OIDC 識別資訊提供者網址,用於在 OAUTH 模式中驗證 Schema Registry 用戶端。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式的必要欄位。
  • outputDeadletterTable:失敗訊息的完全合格 BigQuery 資料表名稱。訊息無法到達輸出資料表的所有原因 (例如結構定義不相符、JSON 格式錯誤) 會寫入此資料表。此資料表會由範本建立。例如:your-project-id:your-dataset.your-table-name
  • javascriptTextTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:指定重新載入 UDF 的頻率 (以分鐘為單位)。如果值大於 0,Dataflow 會定期檢查 Cloud Storage 中的 UDF 檔案,並在檔案經過修改時重新載入 UDF。您可以在管道執行期間更新 UDF,不必重新啟動工作。如果值為 0,系統會停用 UDF 重新載入功能。預設值為 0

使用者定義函式

您可以視需要撰寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。元素酬載會序列化為 JSON 字串。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

範本僅支援 JSON 格式的 Kafka 訊息 UDF。如果 Kafka 訊息使用 Avro 格式,系統不會叫用 UDF。

函式規格

UDF 的規格如下:

  • 輸入:Kafka 記錄值,序列化為 JSON 字串
  • 輸出內容:符合 BigQuery 目的地資料表結構定義的 JSON 字串

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Kafka to BigQuery template。
  6. 在提供的參數欄位中輸入參數值。
  7. 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」
  8. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和主題

    啟動伺服器位址和主題的格式取決於叢集類型:

    • Managed Service for Apache Kafka 叢集: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka 叢集: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME:BigQuery 資料集名稱
  • TABLE_NAME:BigQuery 輸出資料表的名稱
  • ERROR_TABLE_NAME:要寫入錯誤記錄的 BigQuery 資料表名稱

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "messageFormat": "JSON",
          "writeMode": "SINGLE_TABLE_NAME",
          "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME",
          "useBigQueryDLQ": "true",
          "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和主題

    啟動伺服器位址和主題的格式取決於叢集類型:

    • Managed Service for Apache Kafka 叢集: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka 叢集: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME:BigQuery 資料集名稱
  • TABLE_NAME:BigQuery 輸出資料表的名稱
  • ERROR_TABLE_NAME:要寫入錯誤記錄的 BigQuery 資料表名稱

詳情請參閱「透過 Dataflow 將資料從 Kafka 寫入 BigQuery」。

後續步驟