本頁面說明如何使用 Dataflow 從 Google Cloud Managed Service for Apache Kafka 讀取資料,並將記錄寫入 BigQuery 資料表。本教學課程使用 Apache Kafka 到 BigQuery 範本建立 Dataflow 工作。
總覽
Apache Kafka 是開放原始碼平台,可串流事件。Kafka 通常用於分散式架構,可讓鬆散耦合的元件之間進行通訊。您可以使用 Dataflow 從 Kafka 讀取事件、處理事件,然後將結果寫入 BigQuery 資料表,以進行進一步分析。
Managed Service for Apache Kafka 是一項 Google Cloud 服務,可協助您執行安全且可擴充的 Kafka 叢集。

所需權限
Dataflow工作站服務帳戶必須具備下列 Identity and Access Management (IAM) 角色:
- 代管 Kafka 用戶端 (
roles/managedkafka.client
) - BigQuery 資料編輯者 (
roles/bigquery.dataEditor
)
詳情請參閱「Dataflow 安全性與權限」。
建立 Kafka 叢集
在本步驟中,您將建立 Managed Service for Apache Kafka 叢集。詳情請參閱「建立 Managed Service for Apache Kafka 叢集」。
控制台
前往「Managed Service for Apache Kafka」>「Clusters」頁面。
點選「建立」。
在「叢集名稱」方塊中輸入叢集的名稱。
在「Region」(區域) 清單中,選取叢集的位置。
點選「建立」。
gcloud
使用 managed-kafka clusters create
指令。
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
更改下列內容:
CLUSTER
:叢集名稱REGION
:您建立子網路的區域PROJECT_ID
:您的專案 IDSUBNET_NAME
:要部署叢集的子網路
建立叢集通常需要 20 到 30 分鐘。
建立 Kafka 主題
建立 Managed Service for Apache Kafka 叢集後,請建立主題。
控制台
前往「Managed Service for Apache Kafka」>「Clusters」頁面。
按一下叢集名稱。
在叢集詳細資料頁面中,按一下「建立主題」。
在「主題名稱」方塊中,輸入主題名稱。
點選「建立」。
gcloud
使用 managed-kafka topics create
指令。
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
更改下列內容:
TOPIC_NAME
:要建立的主題名稱
建立 BigQuery 資料表
在本步驟中,您將建立具有下列結構定義的 BigQuery 資料表:
資料欄名稱 | 資料類型 |
---|---|
name |
STRING |
customer_id |
INTEGER |
如果沒有 BigQuery 資料集,請先建立一個。詳情請參閱建立資料集。接著建立新的空白資料表:
主控台
前往「BigQuery」頁面
在「Explorer」窗格中展開專案,然後選取資料集。
在「資料集」資訊部分,按一下
「建立資料表」。在「Create table from」(建立資料表來源) 清單中,選取「Empty table」(空白資料表)。
在「Table」(資料表) 方塊中,輸入資料表名稱。
在「Schema」(結構定義) 區段中,按一下「以文字形式編輯」。
貼上下列結構定義:
name:STRING, customer_id:INTEGER
點選「建立資料表」。
gcloud
使用 bq mk
指令。
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
更改下列內容:
PROJECT_ID
:您的專案 IDDATASET_NAME
:資料集名稱TABLE_NAME
:要建立的資料表名稱
執行 Dataflow 工作
建立 Kafka 叢集和 BigQuery 資料表後,請執行 Dataflow 範本。
主控台
首先,取得叢集的啟動伺服器位址:
前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。
按一下叢集名稱。
按一下 [設定] 分頁標籤。
從「啟動網址」複製啟動伺服器位址。
接著,執行範本來建立 Dataflow 工作:
前往「Dataflow」>「Jobs」(工作) 頁面。
按一下 [Create job from template] (利用範本建立工作)。
在「Job Name」欄位中輸入
kafka-to-bq
。在「區域端點」中,選取 Managed Service for Apache Kafka 叢集所在的區域。
選取「Kafka to BigQuery」範本。
輸入下列範本參數:
- Kafka 啟動伺服器:啟動伺服器位址
- 來源 Kafka 主題:要讀取的主題名稱
- Kafka 來源驗證模式:
APPLICATION_DEFAULT_CREDENTIALS
- Kafka 訊息格式:
JSON
- 資料表名稱策略:
SINGLE_TABLE_NAME
- BigQuery 輸出資料表:BigQuery 資料表,格式如下:
PROJECT_ID
:DATASET_NAME
.TABLE_NAME
在「無效信件佇列」下方,勾選「將錯誤寫入 BigQuery」。
輸入死信佇列的 BigQuery 資料表名稱,格式如下:
PROJECT_ID
:DATASET_NAME
.ERROR_TABLE_NAME
。請勿事先建立這個資料表。管道會建立這個檔案。
按一下「Run Job」(執行工作)。
gcloud
使用 dataflow flex-template run
指令。
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
請替換下列變數:
LOCATION
:Managed Service for Apache Kafka 所在的區域PROJECT_ID
:您的 Google Cloud 專案名稱CLUSTER_ID
:叢集TOPIC
:Kafka 主題的名稱DATASET_NAME
:資料集名稱TABLE_NAME
:資料表名稱ERROR_TABLE_NAME
:死信佇列的 BigQuery 資料表名稱
請勿預先建立死信佇列的資料表。管道會建立這個檔案。
將訊息傳送至 Kafka
Dataflow 工作啟動後,您就可以將訊息傳送至 Kafka,管道會將訊息寫入 BigQuery。
在與 Kafka 叢集相同的子網路中建立 VM,並安裝 Kafka 指令列工具。如需詳細操作說明,請參閱「使用 CLI 發布及接收訊息」中的「設定用戶端機器」。
執行下列指令,將訊息寫入 Kafka 主題:
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
請替換下列變數:
TOPIC
:Kafka 主題的名稱CLUSTER_ID
:叢集名稱LOCATION
:叢集所在的區域PROJECT_ID
:您的 Google Cloud 專案名稱
在提示中輸入下列幾行文字,將訊息傳送至 Kafka:
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
使用無效信件佇列
工作執行期間,管道可能無法將個別訊息寫入 BigQuery。可能發生的錯誤包括:
- 序列化錯誤,包括格式錯誤的 JSON。
- 類型轉換錯誤,是由於資料表結構定義與 JSON 資料不符所致。
- JSON 資料中不在表格結構定義中的額外欄位。
這些錯誤不會導致工作失敗,也不會顯示為 Dataflow 工作記錄中的錯誤。管道會改用死信佇列處理這類錯誤。
如要在執行範本時啟用無法傳送的訊息佇列,請設定下列範本參數:
useBigQueryDLQ
:true
outputDeadletterTable
:BigQuery 資料表的完整名稱,例如my-project:dataset1.errors
管道會自動建立資料表。如果處理 Kafka 訊息時發生錯誤,管道會將錯誤項目寫入資料表。
錯誤訊息範例:
錯誤類型 | 事件資料 | errorMessage |
---|---|---|
序列化錯誤 | 「Hello world」 | 無法將 JSON 序列化為表格列:「Hello world」 |
類型轉換錯誤 | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
不明欄位 | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
使用 BigQuery 資料類型
在內部,Kafka I/O 連接器會將 JSON 訊息酬載轉換為 Apache Beam TableRow
物件,並將 TableRow
欄位值轉換為 BigQuery 型別。
下表顯示 BigQuery 資料類型的 JSON 表示法。
BigQuery 類型 | JSON 表示法 |
---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)" 使用 well-known text (WKT) 或 GeoJSON 指定地理位置,格式為字串。詳情請參閱「載入地理空間資料」。 |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z" 使用 JavaScript |
結構化資料
如果 JSON 訊息遵循一致的結構定義,您可以在 BigQuery 中使用 STRUCT
資料類型表示 JSON 物件。
在下列範例中,answers
欄位是具有兩個子欄位 (a
和 b
) 的 JSON 物件:
{"name":"Emily","answers":{"a":"yes","b":"no"}}
下列 SQL 陳述式會建立具有相容結構定義的 BigQuery 資料表:
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
產生的資料表如下所示:
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
半結構化資料
如果 JSON 訊息不符合嚴格的結構定義,請考慮以 JSON
資料類型將訊息儲存在 BigQuery 中。將 JSON 資料儲存為 JSON
型別,就不需要預先定義結構定義。資料擷取完畢後,您可以使用 GoogleSQL 中的欄位存取 (點記號) 和陣列存取運算子查詢資料。詳情請參閱在 GoogleSQL 中使用 JSON 資料。
使用 UDF 轉換資料
本教學課程假設 Kafka 訊息的格式為 JSON,且 BigQuery 表格結構定義與 JSON 資料相符,資料未經過任何轉換。
您可以選擇提供 JavaScript 使用者定義函式 (UDF),在資料寫入 BigQuery 前轉換資料。UDF 也可以執行其他處理作業,例如篩選、移除個人識別資訊 (PII),或使用其他欄位擴充資料。
詳情請參閱「為 Dataflow 範本建立使用者定義函式」。