使用 Dataflow 將資料從 Kafka 寫入 BigQuery

本頁面說明如何使用 Dataflow 從 Google Cloud Managed Service for Apache Kafka 讀取資料,並將記錄寫入 BigQuery 資料表。本教學課程使用 Apache Kafka 到 BigQuery 範本建立 Dataflow 工作。

總覽

Apache Kafka 是開放原始碼平台,可串流事件。Kafka 通常用於分散式架構,可讓鬆散耦合的元件之間進行通訊。您可以使用 Dataflow 從 Kafka 讀取事件、處理事件,然後將結果寫入 BigQuery 資料表,以進行進一步分析。

Managed Service for Apache Kafka 是一項 Google Cloud 服務,可協助您執行安全且可擴充的 Kafka 叢集。

將 Kafka 事件讀取至 BigQuery
使用 Apache Kafka 的事件驅動架構

所需權限

Dataflow工作站服務帳戶必須具備下列 Identity and Access Management (IAM) 角色:

  • 代管 Kafka 用戶端 (roles/managedkafka.client)
  • BigQuery 資料編輯者 (roles/bigquery.dataEditor)

詳情請參閱「Dataflow 安全性與權限」。

建立 Kafka 叢集

在本步驟中,您將建立 Managed Service for Apache Kafka 叢集。詳情請參閱「建立 Managed Service for Apache Kafka 叢集」。

控制台

  1. 前往「Managed Service for Apache Kafka」>「Clusters」頁面。

    前往「Clusters」(叢集)

  2. 點選「建立」

  3. 在「叢集名稱」方塊中輸入叢集的名稱。

  4. 在「Region」(區域) 清單中,選取叢集的位置。

  5. 點選「建立」

gcloud

使用 managed-kafka clusters create 指令。

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

更改下列內容:

  • CLUSTER:叢集名稱
  • REGION:您建立子網路的區域
  • PROJECT_ID:您的專案 ID
  • SUBNET_NAME:要部署叢集的子網路

建立叢集通常需要 20 到 30 分鐘。

建立 Kafka 主題

建立 Managed Service for Apache Kafka 叢集後,請建立主題。

控制台

  1. 前往「Managed Service for Apache Kafka」>「Clusters」頁面。

    前往「Clusters」(叢集)

  2. 按一下叢集名稱。

  3. 在叢集詳細資料頁面中,按一下「建立主題」

  4. 在「主題名稱」方塊中,輸入主題名稱。

  5. 點選「建立」

gcloud

使用 managed-kafka topics create 指令。

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

更改下列內容:

  • TOPIC_NAME:要建立的主題名稱

建立 BigQuery 資料表

在本步驟中,您將建立具有下列結構定義的 BigQuery 資料表:

資料欄名稱 資料類型
name STRING
customer_id INTEGER

如果沒有 BigQuery 資料集,請先建立一個。詳情請參閱建立資料集。接著建立新的空白資料表:

主控台

  1. 前往「BigQuery」頁面

    前往 BigQuery

  2. 在「Explorer」窗格中展開專案,然後選取資料集。

  3. 在「資料集」資訊部分,按一下 「建立資料表」

  4. 在「Create table from」(建立資料表來源) 清單中,選取「Empty table」(空白資料表)

  5. 在「Table」(資料表) 方塊中,輸入資料表名稱。

  6. 在「Schema」(結構定義) 區段中,按一下「以文字形式編輯」

  7. 貼上下列結構定義:

    name:STRING,
    customer_id:INTEGER
    
  8. 點選「建立資料表」。

gcloud

使用 bq mk 指令。

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

更改下列內容:

  • PROJECT_ID:您的專案 ID
  • DATASET_NAME:資料集名稱
  • TABLE_NAME:要建立的資料表名稱

執行 Dataflow 工作

建立 Kafka 叢集和 BigQuery 資料表後,請執行 Dataflow 範本。

主控台

首先,取得叢集的啟動伺服器位址:

  1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 按一下叢集名稱。

  3. 按一下 [設定] 分頁標籤。

  4. 從「啟動網址」複製啟動伺服器位址。

接著,執行範本來建立 Dataflow 工作:

  1. 前往「Dataflow」>「Jobs」(工作) 頁面。

    前往「Jobs」(工作) 頁面

  2. 按一下 [Create job from template] (利用範本建立工作)

  3. 在「Job Name」欄位中輸入 kafka-to-bq

  4. 在「區域端點」中,選取 Managed Service for Apache Kafka 叢集所在的區域。

  5. 選取「Kafka to BigQuery」範本。

  6. 輸入下列範本參數:

    • Kafka 啟動伺服器:啟動伺服器位址
    • 來源 Kafka 主題:要讀取的主題名稱
    • Kafka 來源驗證模式APPLICATION_DEFAULT_CREDENTIALS
    • Kafka 訊息格式JSON
    • 資料表名稱策略SINGLE_TABLE_NAME
    • BigQuery 輸出資料表:BigQuery 資料表,格式如下:PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. 在「無效信件佇列」下方,勾選「將錯誤寫入 BigQuery」

  8. 輸入死信佇列的 BigQuery 資料表名稱,格式如下:PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

    請勿事先建立這個資料表。管道會建立這個檔案。

  9. 按一下「Run Job」(執行工作)

gcloud

使用 dataflow flex-template run 指令。

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

請替換下列變數:

  • LOCATION:Managed Service for Apache Kafka 所在的區域
  • PROJECT_ID:您的 Google Cloud 專案名稱
  • CLUSTER_ID:叢集
  • TOPIC:Kafka 主題的名稱
  • DATASET_NAME:資料集名稱
  • TABLE_NAME:資料表名稱
  • ERROR_TABLE_NAME:死信佇列的 BigQuery 資料表名稱

請勿預先建立死信佇列的資料表。管道會建立這個檔案。

將訊息傳送至 Kafka

Dataflow 工作啟動後,您就可以將訊息傳送至 Kafka,管道會將訊息寫入 BigQuery。

  1. 在與 Kafka 叢集相同的子網路中建立 VM,並安裝 Kafka 指令列工具。如需詳細操作說明,請參閱「使用 CLI 發布及接收訊息」中的「設定用戶端機器」。

  2. 執行下列指令,將訊息寫入 Kafka 主題:

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    請替換下列變數:

    • TOPIC:Kafka 主題的名稱
    • CLUSTER_ID:叢集名稱
    • LOCATION:叢集所在的區域
    • PROJECT_ID:您的 Google Cloud 專案名稱
  3. 在提示中輸入下列幾行文字,將訊息傳送至 Kafka:

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

使用無效信件佇列

工作執行期間,管道可能無法將個別訊息寫入 BigQuery。可能發生的錯誤包括:

  • 序列化錯誤,包括格式錯誤的 JSON。
  • 類型轉換錯誤,是由於資料表結構定義與 JSON 資料不符所致。
  • JSON 資料中不在表格結構定義中的額外欄位。

這些錯誤不會導致工作失敗,也不會顯示為 Dataflow 工作記錄中的錯誤。管道會改用死信佇列處理這類錯誤。

如要在執行範本時啟用無法傳送的訊息佇列,請設定下列範本參數:

  • useBigQueryDLQtrue
  • outputDeadletterTable:BigQuery 資料表的完整名稱,例如 my-project:dataset1.errors

管道會自動建立資料表。如果處理 Kafka 訊息時發生錯誤,管道會將錯誤項目寫入資料表。

錯誤訊息範例:

錯誤類型 事件資料 errorMessage
序列化錯誤 「Hello world」 無法將 JSON 序列化為表格列:「Hello world」
類型轉換錯誤 {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
不明欄位 {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

使用 BigQuery 資料類型

在內部,Kafka I/O 連接器會將 JSON 訊息酬載轉換為 Apache Beam TableRow 物件,並將 TableRow 欄位值轉換為 BigQuery 型別。

下表顯示 BigQuery 資料類型的 JSON 表示法。

BigQuery 類型 JSON 表示法
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

使用 well-known text (WKT) 或 GeoJSON 指定地理位置,格式為字串。詳情請參閱「載入地理空間資料」。

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

使用 JavaScript Date.toJSON 方法設定值的格式。

結構化資料

如果 JSON 訊息遵循一致的結構定義,您可以在 BigQuery 中使用 STRUCT 資料類型表示 JSON 物件。

在下列範例中,answers 欄位是具有兩個子欄位 (ab) 的 JSON 物件:

{"name":"Emily","answers":{"a":"yes","b":"no"}}

下列 SQL 陳述式會建立具有相容結構定義的 BigQuery 資料表:

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

產生的資料表如下所示:

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

半結構化資料

如果 JSON 訊息不符合嚴格的結構定義,請考慮以 JSON 資料類型將訊息儲存在 BigQuery 中。將 JSON 資料儲存為 JSON 型別,就不需要預先定義結構定義。資料擷取完畢後,您可以使用 GoogleSQL 中的欄位存取 (點記號) 和陣列存取運算子查詢資料。詳情請參閱在 GoogleSQL 中使用 JSON 資料

使用 UDF 轉換資料

本教學課程假設 Kafka 訊息的格式為 JSON,且 BigQuery 表格結構定義與 JSON 資料相符,資料未經過任何轉換。

您可以選擇提供 JavaScript 使用者定義函式 (UDF),在資料寫入 BigQuery 前轉換資料。UDF 也可以執行其他處理作業,例如篩選、移除個人識別資訊 (PII),或使用其他欄位擴充資料。

詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

後續步驟