配置 BigQuery 目标位置

本页介绍如何配置 BigQuery 目标位置,以使用 Datastream 从源数据库流式传输数据。

配置目标数据集

为 BigQuery 目标位置配置数据集时,您可以选择以下任一选项:

  • 为每个架构创建一个数据集:系统会根据源的架构名称,在指定的 BigQuery 位置中选择或创建数据集。因此,对于来源中的每个架构,Datastream 会在 BigQuery 中自动创建一个数据集。

    例如,如果您有一个 MySQL 来源,并且该来源包含一个 mydb 数据库以及该数据库中的一个 employees 表,那么 Datastream 会在 BigQuery 中创建 mydb 数据集和 employees 表。

    如果您选择此选项,Datastream 会在包含该流的项目中创建数据集。虽然您无需在与数据流相同的区域中创建数据集,但为了优化费用和性能,我们建议您将数据流的所有资源以及数据集都放在同一区域中。

  • 所有架构共用一个数据集:您可以为数据流选择一个 BigQuery 数据集。Datastream 会将所有数据流式传输到此数据集。对于您选择的数据集,Datastream 会以 <schema>_<table> 的形式创建所有表。

    例如,如果您有一个 MySQL 来源,并且该来源包含一个 mydb 数据库和该数据库中的一个 employees 表,那么 Datastream 会在您选择的数据集中创建 mydb_employees 表。

写入行为

  • 将数据流式传输到 BigQuery 时,事件的最大大小为 20 MB。

  • 配置数据流时,您可以选择 Datastream 将更改数据写入 BigQuery 的方式。如需了解详情,请参阅配置写入模式

配置写入模式

您可以使用以下两种模式来定义希望如何将数据写入 BigQuery:

  • 合并:这是默认的写入模式。选择此选项后,BigQuery 会反映数据在源数据库中的存储方式。这意味着,Datastream 会将数据的所有更改写入 BigQuery,然后 BigQuery 会将这些更改与现有数据合并,从而创建源表的副本作为最终表。在合并模式下,系统不会保留更改事件的任何历史记录。例如,如果您先插入一行,然后更新该行,BigQuery 只会保留更新后的数据。如果您随后从源表中删除相应行,BigQuery 将不再保留该行的任何记录。
  • 仅追加:仅追加写入模式允许您以变更数据流(INSERTUPDATE-INSERTUPDATE-DELETEDELETE 事件)的形式向 BigQuery 添加数据。如果您需要保留数据的历史状态,请使用此模式。 为了更好地了解仅附加写入模式,请考虑以下场景:
    • 初始回填:初始回填完成后,所有事件都会以 INSERT 类型事件写入 BigQuery,并具有相同的时间戳、通用唯一标识符 (UUID) 和更改序列号。
    • 主键更新:当主键发生更改时,系统会将两行数据写入 BigQuery:
      • 包含原始主键的 UPDATE-DELETE
      • 包含新主密钥的 UPDATE-INSERT
    • 行更新:当您更新某一行时,系统会将单个 UPDATE-INSERT 行写入 BigQuery
    • 删除行:当您删除某一行时,系统会将一个 DELETE 行写入 BigQuery

表元数据

Datastream 会向写入 BigQuery 目标平台的每个表中附加一个名为 datastream_metadataSTRUCT 列。

合并写入模式

如果表在来源中具有主键,则该列包含以下字段:

  • UUID:此字段的数据类型为 STRING
  • SOURCE_TIMESTAMP:此字段的数据类型为 INTEGER

如果表没有主键,则该列包含一个额外的字段:IS_DELETED。此字段的数据类型为 BOOLEAN,用于指示 Datastream 流式传输到目标位置的数据是否与源中的 DELETE 操作相关联。没有主键的表是仅附加的。

仅附加写入模式

datastream_metadata 列包含具有主键和不具有主键的表的相同字段:

  • UUID:此字段的数据类型为 STRING
  • SOURCE_TIMESTAMP:此字段的数据类型为 INTEGER
  • CHANGE_SEQUENCE_NUMBER:此字段的数据类型为 STRING。这是 Datastream 为每个变更事件使用的内部序列号。
  • CHANGE_TYPE:此字段的数据类型为 STRING。它表示更改事件的类型:INSERTUPDATE-INSERTUPDATE-DELETEDELETE
  • SORT_KEYS:此字段包含一个 STRING 值数组。您可以使用这些值对更改事件进行排序。

使用 max_staleness 选项搭配 BigQuery 表

在近乎实时的数据提取过程中,Datastream 会使用 BigQuery 对更新插入操作(例如更新、插入和删除数据)的内置支持。借助 upsert 操作,您可以在添加、修改或删除行时动态更新 BigQuery 目标。Datastream 使用 BigQuery Storage Write API 将这些 upsert 操作流式传输到目标表中。

指定数据过时期限

BigQuery 会根据配置的数据过时限制,持续或在查询运行时在后台应用源修改。当 Datastream 在 BigQuery 中创建新表时,该表的 max_staleness 选项会根据数据流的当前数据过时限制值进行设置。

如需详细了解如何将 BigQuery 表与 max_staleness 选项搭配使用,请参阅表过时程度

控制 BigQuery 费用

BigQuery 费用与 Datastream 费用分开收取。如需了解如何控制 BigQuery 费用,请参阅 BigQuery CDC 价格

地图数据类型

下表列出了从支持的源数据库到 BigQuery 目标的数据类型转换。


源数据库 源数据类型 BigQuery 数据类型
MySQL BIGINT(size) INT64
MySQL BIGINT (unsigned) DECIMAL
MySQL BINARY(size) STRING (hex encoded)
MySQL BIT(size) INT64
MySQL BLOB(size) STRING (hex encoded)
MySQL BOOL INT64
MySQL CHAR(size) STRING
MySQL DATE DATE
MySQL DATETIME(fsp) DATETIME
MySQL DECIMAL(precision, scale) 如果精度值 <=38,且小数位数值 <=9,则为 NUMERIC。否则为 BIGNUMERIC
MySQL DOUBLE(size, d) FLOAT64
MySQL ENUM(val1, val2, val3, ...) STRING
MySQL FLOAT(precision) FLOAT64
MySQL FLOAT(size, d) FLOAT64
MySQL INTEGER(size) INT64
MySQL INTEGER (unsigned) INT64
MySQL

JSON

JSON 支持
MySQL LONGBLOB STRING (hex encoded)
MySQL LONGTEXT STRING
MySQL MEDIUMBLOB STRING (hex encoded)
MySQL MEDIUMINT(size) INT64
MySQL MEDIUMTEXT STRING
MySQL SET(val1, val2, val3, ...) STRING
MySQL SMALLINT(size) INT64
MySQL TEXT(size) STRING
MySQL TIME(fsp) INTERVAL
MySQL TIMESTAMP(fsp) TIMESTAMP
MySQL TINYBLOB STRING (hex encoded)
MySQL TINYINT(size) INT64
MySQL TINYTEXT STRING
MySQL VARBINARY(size) STRING (hex encoded)
MySQL VARCHAR STRING
MySQL YEAR INT64
Oracle ANYDATA UNSUPPORTED
Oracle BFILE STRING
Oracle BINARY DOUBLE FLOAT64
Oracle BINARY FLOAT FLOAT64
Oracle BLOB BYTES
Oracle CHAR STRING
Oracle CLOB STRING
Oracle DATE DATETIME
Oracle DOUBLE PRECISION FLOAT64
Oracle FLOAT(p) FLOAT64
Oracle INTERVAL DAY TO SECOND UNSUPPORTED
Oracle INTERVAL YEAR TO MONTH UNSUPPORTED
Oracle LONG/LONG RAW STRING
Oracle NCHAR STRING
Oracle NCLOB STRING
Oracle NUMBER STRING
Oracle NUMBER(precision=*) STRING
Oracle NUMBER(precision, scale<=0) 如果 p<=18,则 INT64。如果 18<p=<78,则映射到参数化的小数类型。如果 p>=79,则映射到 STRING
Oracle NUMBER(precision, scale>0) 如果 0<p=<78,则映射到参数化的小数类型。如果 p>=79,则映射到 STRING
Oracle NVARCHAR2 STRING
Oracle RAW STRING
Oracle ROWID STRING
Oracle SDO_GEOMETRY UNSUPPORTED
Oracle SMALLINT INT64
Oracle TIMESTAMP TIMESTAMP
Oracle TIMESTAMP WITH TIME ZONE TIMESTAMP
Oracle UDT (user-defined type) UNSUPPORTED
Oracle UROWID STRING
Oracle VARCHAR STRING
Oracle VARCHAR2 STRING
Oracle XMLTYPE UNSUPPORTED
PostgreSQL ARRAY JSON
PostgreSQL BIGINT INT64
PostgreSQL BIT BYTES
PostgreSQL BIT_VARYING BYTES
PostgreSQL BOOLEAN BOOLEAN
PostgreSQL BOX UNSUPPORTED
PostgreSQL BYTEA BYTES
PostgreSQL CHARACTER STRING
PostgreSQL CHARACTER_VARYING STRING
PostgreSQL CIDR STRING
PostgreSQL CIRCLE UNSUPPORTED
PostgreSQL DATE DATE
PostgreSQL DOUBLE_PRECISION FLOAT64
PostgreSQL ENUM STRING
PostgreSQL INET STRING
PostgreSQL INTEGER INT64
PostgreSQL INTERVAL INTERVAL
PostgreSQL JSON JSON
PostgreSQL JSONB JSON
PostgreSQL LINE UNSUPPORTED
PostgreSQL LSEG UNSUPPORTED
PostgreSQL MACADDR STRING
PostgreSQL MONEY FLOAT64
PostgreSQL NUMERIC 如果精度 = -1,则为 STRING(BigQuery NUMERIC 类型需要固定精度)。否则为 BIGNUMERIC/NUMERIC。如需了解详情,请参阅 PostgreSQL 文档中的任意精度数字部分。
PostgreSQL OID INT64
PostgreSQL PATH UNSUPPORTED
PostgreSQL POINT UNSUPPORTED
PostgreSQL POLYGON UNSUPPORTED
PostgreSQL REAL FLOAT64
PostgreSQL SMALLINT INT64
PostgreSQL SMALLSERIAL INT64
PostgreSQL SERIAL INT64
PostgreSQL TEXT STRING
PostgreSQL TIME TIME
PostgreSQL TIMESTAMP TIMESTAMP
PostgreSQL TIMESTAMP_WITH_TIMEZONE TIMESTAMP
PostgreSQL TIME_WITH_TIMEZONE TIME
PostgreSQL TSQUERY STRING
PostgreSQL TSVECTOR STRING
PostgreSQL TXID_SNAPSHOT STRING
PostgreSQL UUID STRING
PostgreSQL XML STRING
SQL Server BIGINT INT64
SQL Server BINARY BYTES
SQL Server BIT BOOL
SQL Server CHAR STRING
SQL Server DATE DATE
SQL Server DATETIME2 DATETIME
SQL Server DATETIME DATETIME
SQL Server DATETIMEOFFSET TIMESTAMP
SQL Server DECIMAL BIGNUMERIC
SQL Server FLOAT FLOAT64
SQL Server IMAGE BYTES
SQL Server INT INT64
SQL Server MONEY BIGNUMERIC
SQL Server NCHAR STRING
SQL Server NTEXT STRING
SQL Server NUMERIC BIGNUMERIC
SQL Server NVARCHAR STRING
SQL Server NVARCHAR(MAX) STRING
SQL Server REAL FLOAT64
SQL Server SMALLDATETIME DATETIME
SQL Server SMALLINT INT64
SQL Server SMALLMONEY NUMERIC
SQL Server TEXT STRING
SQL Server TIME TIME
SQL Server TIMESTAMP/ROWVERSION BYTES
SQL Server TINYINT INT64
SQL Server UNIQUEIDENTIFIER STRING
SQL Server VARBINARY BYTES
SQL Server VARBINARY(MAX) BYTES
SQL Server VARCHAR STRING
SQL Server VARCHAR(MAX) STRING
SQL Server XML STRING
Salesforce BOOLEAN BOOLEAN
Salesforce BYTE BYTES
Salesforce DATE DATE
Salesforce DATETIME DATETIME
Salesforce DOUBLE BIGNUMERIC
Salesforce INT INT64
Salesforce STRING STRING
Salesforce TIME TIME
Salesforce ANYTYPE(可以是 STRINGDATENUMBERBOOLEAN STRING
Salesforce COMBOBOX STRING
Salesforce CURRENCY FLOAT64

允许的最大长度为 18 位数。

Salesforce DATACATEGORYGROUPREFERENCE STRING
Salesforce EMAIL STRING
Salesforce ENCRYPTEDSTRING STRING
Salesforce ID STRING
Salesforce JUNCTIONIDLIST STRING
Salesforce MASTERRECORD STRING
Salesforce MULTIPICKLIST STRING
Salesforce PERCENT FLOAT64

允许的最大长度为 18 位数。

Salesforce PHONE STRING
Salesforce PICKLIST STRING
Salesforce REFERENCE STRING
Salesforce TEXTAREA STRING

允许的长度上限为 255 个字符。

Salesforce URL STRING

MongoDB 数据类型

MongoDB 二进制 JSON (BSON) 文档以 MongoDB Extended JSON (v1) 严格模式格式写入 BigQuery。下表显示了数据类型在 BigQuery 中的表示方式以及示例值。

源数据类型示例值BigQuery JSON 类型值
DOUBLE 3.1415926535 3.1415926535
STRING"Hello, MongoDB!""Hello, MongoDB!"
ARRAY
[
    "item1",
    123,
    true,
    { subItem: "object in array" }
  ]
    
["item1",123,true,{"subItem":"object in array"}]
BINARY DATA new BinData(0, "SGVsbG8gQmluYXJ5IERhdGE=") {"$binary":"SGVsbG8gQmluYXJ5IERhdGE=","$type":"00"}
BOOLEANtruetrue
DATE 2024-12-25T10:30:00.000+00:00 {"$date": 1735122600000}
NULLnullnull
REGEX/^mongo(db)?$/i{"$options":"i","$regex":"^mongo(db)?$"}
JAVASCRIPTfunction() {return this.stringField.length;}{"$code":"function() {\n return this.stringField.length;\n }"}
DECIMAL128NumberDecimal("1234567890.1234567890"){"$numberDecimal":"1234567890.1234567890"}
OBJECTIDObjectId('673c5d8dbfe2e51808cc2c3d'){"$oid": "673c5d8dbfe2e51808cc2c3d"}
LONG3567587327{"$numberLong": "3567587327"}
INT324242
INT641864712049423024127{"$numberLong": "1864712049423024127"}
TIMESTAMPnew Timestamp(1747888877, 1){"$timestamp":{"i":1,"t":1747888877}}

将 PostgreSQL 数组作为 BigQuery 数组数据类型进行查询

如果您希望将 PostgreSQL 数组作为 BigQuery ARRAY 数据类型进行查询,可以使用 BigQuery JSON_VALUE_ARRAY 函数将 JSON 值转换为 BigQuery 数组:

  SELECT ARRAY(SELECT CAST(element AS TYPE) FROM UNNEST(JSON_VALUE_ARRAY(BQ_COLUMN_NAME,'$')) AS element) AS array_col
  

替换以下内容:

  • TYPE:与 PostgreSQL 源数组中的元素类型匹配的 BigQuery 类型。例如,如果源类型是 BIGINT 值数组,则将 TYPE 替换为 INT64

    如需详细了解如何映射数据类型,请参阅映射数据类型

  • BQ_COLUMN_NAME:BigQuery 表中相关列的名称。

以下两种情况例外:

  • 对于源列中 BITBIT_VARYINGBYTEA 值的数组,请运行以下查询:

    SELECT ARRAY(SELECT FROM_BASE64(element) FROM UNNEST(JSON_VALUE_ARRAY(BQ_COLUMN_NAME,'$')) AS element) AS array_of_bytes
  • 对于源列中 JSONJSONB 值的数组,请使用 JSON_QUERY_ARRAY 函数:

    SELECT ARRAY(SELECT element FROM UNNEST(JSON_QUERY_ARRAY(BQ_COLUMN_NAME,'$')) AS element) AS array_of_jsons

已知限制

将 BigQuery 用作目标位置的已知限制包括:

  • 您只能将数据复制到与 Datastream 流位于同一 Google Cloud 项目中的 BigQuery 数据集。
  • 默认情况下,Datastream 不支持向已复制到 BigQuery 且没有主键的表添加主键,也不支持从已复制到 BigQuery 且有主键的表中移除主键。如果您需要进行此类更改,请与 Google 支持团队联系。如需了解如何更改已具有主键的源表的主键定义,请参阅诊断问题
  • BigQuery 中的主键必须采用以下数据类型:

    • DATE
    • BOOL
    • GEOGRAPHY
    • INT64
    • NUMERIC
    • BIGNUMERIC
    • STRING
    • TIMESTAMP
    • DATETIME

    包含不受支持的数据类型的主键的表不会被 Datastream 复制。

  • BigQuery 不支持包含 .$/@+ 字符的表名。Datastream 在创建目标表时会将此类字符替换为下划线。

    例如,源数据库中的 table.name 在 BigQuery 中会变为 table_name

    如需详细了解 BigQuery 中的表名称,请参阅表命名

  • BigQuery 不支持超过 4 个聚簇列。复制主键列超过 4 列的表时,Datastream 会使用 4 个主键列作为聚簇列。

  • Datastream 会将超出范围的日期和时间字面量(例如 PostgreSQL 无限日期类型)映射到以下值:

    • 正值:DATE9999-12-31
    • 将负 DATE 更改为 0001-01-01 的值
    • 正值:TIMESTAMP9999-12-31 23:59:59.999000 UTC
    • 将负 TIMESTAMP 更改为 0001-01-01 00:00:00 UTC 的值
  • BigQuery 不支持主键数据类型为 FLOATREAL 的流式表。此类表不会被复制。 如需详细了解 BigQuery 日期类型和范围,请参阅数据类型

  • 如果您的数据源是 Salesforce,则不支持每个架构的数据集配置选项。

后续步骤