本文档介绍如何使用旧式 tabledata.insertAll
方法将数据流式传输到 BigQuery 中。
对于新项目,我们建议使用 BigQuery Storage Write API,而不是使用 tabledata.insertAll
方法。Storage Write API 价格更低且功能更强大,包括“正好一次”传送语义。如果您要将现有项目从 tabledata.insertAll
方法迁移到 Storage Write API,我们建议您选择默认流。tabledata.insertAll
方法仍然完全受支持。
准备工作
确保您对包含目标表的数据集具有写入权限。除非使用的是模板表,否则在开始向目标表写入数据之前该表就必须存在。如需详细了解模板表,请参阅使用模板表自动创建表。
查看流式传输数据的配额政策。
-
Make sure that billing is enabled for your Google Cloud project.
授予为用户提供执行本文档中的每个任务所需权限的 Identity and Access Management (IAM) 角色。
不支持通过免费层级流式传输。如果您在未启用结算功能的情况下尝试使用流式传输,则会收到以下错误:BigQuery: Streaming insert is not allowed in the free tier.
所需权限
如需将数据流式传输到 BigQuery 中,您需要拥有以下 IAM 权限:
bigquery.tables.updateData
(可让您在表中插入数据)bigquery.tables.get
(可让您获取表元数据)bigquery.datasets.get
(可让您获取数据集元数据)bigquery.tables.create
(如果您使用模板表自动创建表,则为必需)
以下预定义 IAM 角色都包含将数据流式传输到 BigQuery 中所需的权限:
roles/bigquery.dataEditor
roles/bigquery.dataOwner
roles/bigquery.admin
如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅预定义的角色和权限。
将数据流式插入到 BigQuery
C#
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 C# 设置说明进行操作。 如需了解详情,请参阅 BigQuery C# API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
Go
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 BigQuery Go API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
Java
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
Node.js
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 BigQuery Node.js API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
PHP
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 PHP 设置说明进行操作。 如需了解详情,请参阅 BigQuery PHP API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
Python
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 BigQuery Python API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
Ruby
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 BigQuery Ruby API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
插入行时无需填充 insertID
字段。以下示例展示了如何避免在流式插入数据时为每行发送 insertID
。
Java
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
Python
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 BigQuery Python API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
发送日期和时间数据
对于日期和时间字段,请按如下所示在 tabledata.insertAll
方法中设置数据的格式:
类型 | 格式 |
---|---|
DATE |
格式为 "YYYY-MM-DD" 的字符串 |
DATETIME |
格式为 "YYYY-MM-DD [HH:MM:SS]" 的字符串 |
TIME |
格式为 "HH:MM:SS" 的字符串 |
TIMESTAMP |
从 1970-01-01(Unix 纪元)开始计算的秒数,或格式为 "YYYY-MM-DD HH:MM[:SS]" 的字符串 |
发送范围数据
对于类型为 RANGE<T>
的字段,请将 tabledata.insertAll
方法中的数据格式设置为包含两个字段(start
和 end
)的 JSON 对象。start
和 end
字段缺失值或值为 null 表示无界限边界。这些字段必须采用类型为 T
的相同受支持 JSON 格式,其中 T
可以是 DATE
、DATETIME
和 TIMESTAMP
中的一个。
在以下示例中,f_range_date
字段表示表中的 RANGE<DATE>
列。使用 tabledata.insertAll
API 将一行插入此列。
{
"f_range_date": {
"start": "1970-01-02",
"end": null
}
}
流式插入的数据的可用性
在 BigQuery 成功确认 tabledata.insertAll
请求后,您便可立即使用 GoogleSQL 查询对数据进行实时分析。
最近流式插入到注入时间分区表的行的 _PARTITIONTIME
伪列临时具有 NULL 值。对于此类行,BigQuery 通常会在几分钟内在后台分配 PARTITIONTIME
列的最终非 NULL 值。在极少数情况下,此过程最长可能需要 90 分钟。
某些在最近流式插入的行可能无法在几分钟内用于表复制。在极少数情况下,此过程最长可能需要 90 分钟。 如需了解数据是否可用于表复制,请检查 tables.get
响应中是否存在名为 streamingBuffer
的部分。如果 streamingBuffer
部分不存在,则数据可供复制。您还可以使用 streamingBuffer.oldestEntryTime
字段标识流式传输缓冲区中记录的存在时间。
尽力去重功能
当您为插入的行提供 insertId
时,BigQuery 使用此 ID 支持尽力去重功能(最长可达一分钟)。也就是说,如果您在此时间段内多次将具有相同 insertId
的同一行流式插入到同一表中,则 BigQuery 可能会删除该行多次出现的重复数据,让数据仅出现一次。
系统认为具有相同 insertId
的行是相同的行。如果两行具有相同的 insertId
,BigQuery 保留哪一行是不确定的。
去重功能通常用于分布式系统中由于错误(例如系统和 BigQuery 之间发生网络错误或 BigQuery 中发生内部错误)而无法确定流式插入的状态时的重试场景。如果重试插入操作,请对同一组行使用相同的 insertId
,让 BigQuery 能够尝试删除重复的数据。如需了解详情,请参阅排查流式插入问题。
BigQuery 提供的是尽力去重功能,不应依赖它作为您的数据中没有重复项的保证机制。此外,BigQuery 随时都有可能降低尽力去重功能的质量,以确保提供更高的数据可靠性和可用性。
如果您有严格的数据去重要求,可选择支持事务的 Google Cloud Datastore 服务。
停用尽力去重功能
对于每一插入行,您可以通过不填充其 insertId
字段,停用尽力去重功能。这是插入数据的推荐方法。
Apache Beam 和 Dataflow
如需在使用 Java 版 Apache Beam 的 BigQuery I/O 连接器时停用尽力去重功能,请使用 ignoreInsertIds()
方法。
手动移除重复项
为确保在完成流式插入后不存在重复行,请使用以下手动过程:
- 将
insertId
作为列添加到表架构中,并在每行的数据中加入insertId
值。 - 在流式插入停止后,执行下列查询以检查重复项:
如果结果大于 1,则表示存在重复项。#standardSQL SELECT MAX(count) FROM( SELECT ID_COLUMN, count(*) as count FROM `TABLE_NAME` GROUP BY ID_COLUMN)
- 如需移除重复项,请运行以下查询。指定目标表、允许大型结果并停用结果扁平化功能。
#standardSQL SELECT * EXCEPT(row_number) FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY ID_COLUMN) row_number FROM `TABLE_NAME`) WHERE row_number = 1
有关重复项移除查询的注意事项:
- 对于重复项移除查询,较为安全的策略是以新表为目标表。或者,也可以使用写入处置
WRITE_TRUNCATE
来以源表为目标表。 - 重复项移除查询会在表架构的末尾添加一个值为
1
的row_number
列。该查询使用 GoogleSQL 中的SELECT * EXCEPT
语句将row_number
列从目标表中排除。#standardSQL
前缀用于为此查询启用 GoogleSQL。或者,您也可按特定列名称进行选择以忽略此列。 - 要查询移除了重复项的实时数据,您还可以使用重复项移除查询来经由表创建视图。请注意,系统将根据视图中所选的列计算视图的查询费用,这可能导致字节扫描大小较大。
将数据流式插入到时间分区表
将数据流式插入到时间分区表时,每个分区都有一个流式缓冲区。如果您将 writeDisposition
属性设置为 WRITE_TRUNCATE
,当执行会覆盖分区的加载、查询或复制作业时,系统会保留流式缓冲区。如果您想移除流式缓冲区,请对该分区调用 tables.get
以验证流式缓冲区是否为空。
提取时间分区
流式传输到注入时间分区表时,BigQuery 会根据当前世界协调时间 (UTC) 推断目标分区。
新到达的数据会临时放置在 __UNPARTITIONED__
分区中,同时存储在流式缓冲区中。当未分区数据累积到足够多时,BigQuery 会将数据进行分区并放置到正确的分区中。但是,对于数据移出 __UNPARTITIONED__
分区所需的时间,没有服务等级协议 (SLA)。查询可通过使用某一伪列([_PARTITIONTIME
] 或 [_PARTITIONDATE
],具体取决于您的首选数据类型)过滤掉 __UNPARTITIONED__
分区中的 NULL
值,从查询中排除流式缓冲区中的数据。
如果您要将数据流式插入每日分区表,则可以通过在 insertAll
请求中添加分区修饰器来替换推断出的日期。在 tableId
参数中添加修饰器。例如,您可以使用以下分区修饰器将数据流式插入到 table1
表中与 2021-03-01 对应的分区:
table1$20210301
在使用分区修饰器流式插入数据时,您可以将数据流式插入到日期介于当前日期(基于当前世界协调时间 (UTC))之前 31 天至之后 16 天之间的分区。如需将数据写入到日期在这些允许范围之外的分区,请改为使用加载作业或查询作业,相关说明请参阅对分区表数据执行附加和覆盖操作。
只有每日分区表支持使用分区修饰器进行流式插入。每小时、每月或每年分区表不支持此操作。
如需进行测试,您可以使用 bq 命令行工具 bq insert
CLI 命令。例如,以下命令会将一行数据流式传输到名为 mydataset.mytable
的分区表中日期为 2017 年 1 月 1 日 ($20170101
) 的一个分区内:
echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'
时间单位列分区
您可以将数据流式插入到按 DATE
、DATETIME
或 TIMESTAMP
列分区且列值介于过去 5 年到未来 1 年之间的表。不在此范围内的数据会被拒绝。
流式插入数据时,数据最初放在 __UNPARTITIONED__
分区中。当未分区的数据累积到足够多时,BigQuery 会自动对数据进行重新分区,并将其放入相应的分区中。
但是,对于数据移出 __UNPARTITIONED__
分区所需的时间,没有服务等级协议 (SLA)。
- 注意:每日分区的处理方式与每小时、每月和每年分区不同。只有超出日期范围(过去 7 天到未来 3 天)的数据才会提取到 UNPARTITIONED 分区,等待重新分区。另一方面,对于每小时分区表,数据始终会被提取到 UNPARTITIONED 分区,然后进行重新分区。
使用模板表自动创建表
模板表提供了一种将逻辑表拆分为多个较小表的机制,以创建较小的数据集(例如,按用户 ID)。模板表有许多限制,如下所述。建议使用分区表和聚簇表来实现此行为。
如需通过 BigQuery API 使用模板表,请在 insertAll
请求中添加 templateSuffix
参数。对于 bq 命令行工具,请在您的 insert
命令中添加 template_suffix
标志。如果 BigQuery 检测到 templateSuffix
参数或 template_suffix
标志,它会将目标表视为基本模板,并创建一个架构与目标表相同且名称中包含指定后缀的新表:
<targeted_table_name> + <templateSuffix>
使用模板表,免去了逐一创建每个表以及为每个表指定架构的麻烦。您只需创建单个模板并提供不同后缀,BigQuery 便可为您创建新表。BigQuery 会将这些表置于同一项目和数据集中。
通过模板表创建的表通常只需几秒即可使用。极少数情况下可能需要更多时间。
更改模板表架构
如果更改模板表架构,则后续生成的所有表都将使用更新的架构。除非现有表仍具有流式传输缓冲区,否则之前生成的表不受影响。
对于仍具有流式缓冲区的现有表,如果以向后兼容的方式修改模板表架构,则以主动流式插入方式生成的表的架构也会进行更新。但是,如果不以向后兼容的方式修改模板表架构,则所有使用旧架构的缓冲数据都将丢失。此外,如果已生成的现有表使用现已不兼容的旧架构,则您也无法将新数据流式插入到这些表中。
更改模板表架构后,请等待更改传播完成,然后再尝试插入新数据或查询生成的表。插入新字段的请求几分钟就会成功。而尝试查询新字段最多可能需要等待 90 分钟。
如果要更改已生成的表的架构,则直到通过模板表的流式插入停止,并且 tables.get()
响应中没有已生成表的流式插入统计信息部分(表示表中没有缓冲的数据),才能更改架构。
模板表详情
- 模板后缀值
templateSuffix
(或--template_suffix
)值只能包含字母(a-z、A-Z)、数字 (0-9) 或下划线 (_)。表名称和表后缀总共不得超过 1024 个字符。- 配额
模板表受到流式插入配额的限制。您的项目每秒最多可以发出 10 个包含模板表的表(类似于
tables.insert
API)。此配额仅适用于正在创建的表,而不适用于要修改的表。如果您的应用需要每秒创建 10 个以上的表,我们建议您使用聚簇表。例如,您可以将高基数表 ID 放入单个聚簇表的键列中。
- 生存时间
生成的表会从数据集继承到期时间。与普通的流式插入数据一样,无法立即复制生成的表。
- 去重功能
仅会在针对目标表的相同引用之间发生重复信息删除。 例如,如果同时使用模板表和常规
insertAll
命令将数据流式插入到生成的表,则在通过模板表和常规insertAll
命令插入的行之间不会发生去重操作。- 视图
模板表和生成的表不应为视图。
排查流式插入问题
以下部分讨论如何排查在使用旧版流式插入 API 将数据流式插入到 BigQuery 时发生的错误。如需详细了解如何解决流式插入的配额错误,请参阅流式插入配额错误。
失败 HTTP 响应代码
如果收到失败的 HTTP 响应代码(如网络连接错误),则无法确定流式插入是否成功。如果尝试重新发送请求,则可能导致最终表中出现重复的行。为了避免表中出现重复的内容,请在发送请求时设置 insertId
属性。BigQuery 会使用 insertId
属性执行去重操作。
如果您收到权限错误、表名称无效错误或超出配额错误,则系统不会插入任何行,并且整个请求都会失败。
成功 HTTP 响应代码
即使您收到成功 HTTP 响应代码,也需要检查响应的 insertErrors
属性才能确定是否成功插入行,因为 BigQuery 可能只是成功插入了部分行。 您可能会遇到以下情况之一:
- 已成功插入所有行。如果
insertErrors
属性是空列表,则表示所有行均已成功插入。 - 已成功插入一些行。
insertErrors
属性中指示的行并未插入,而其他所有行则已成功插入(除非任何行中存在架构不匹配的情况)。errors
属性详细说明了每个未成功插入行的失败原因。index
属性指示请求中与错误对应的行索引(从 0 开始)。 - 未成功插入任何行。如果 BigQuery 在请求的个别行上遇到架构不匹配的情况,则系统不会插入任何行,并会针对每一行(即使是架构匹配的行)返回一个
insertErrors
条目。对于架构匹配的行,其所对应错误的reason
属性将设置为stopped
,因此您可以按原样重新发送这些行。而对于插入失败的行,其会包含有关架构不匹配情况的详细信息。如需了解每种 BigQuery 数据类型支持的协议缓冲区类型,请参阅数据类型转换。
流式插入的元数据错误
由于 BigQuery 的流式插入 API 可以实现高插入速率,因此在与流式插入系统进行交互时,对底层表元数据展示的修改最终将保持一致。大多数时候,元数据更改会在几分钟内完成传送,但在此期间,API 响应可能会反映表的不一致状态。
部分情况包括:
- 架构更改。针对最近接收了流式插入内容的表修改架构时,响应可能会指出架构不匹配错误,因为流式插入系统可能不会立即反映架构更改。
- 表创建/删除。如果将数据流式插入到不存在的表,则系统将返回
notFound
响应的变体。后续流式插入作业可能无法立即识别在响应中创建的表。同样地,删除或重新创建表可能造成在一段时间内流式插入实际上传输到旧表。流式插入可能不会出现在新表中。 - 表截断。截断表的数据(通过使用 WRITE_TRUNCATE 的 writeDisposition 的查询作业)同样可能导致在一致性期间的后续插入丢失。
数据丢失/不可用
流式插入临时驻留在写入优化存储空间中,该存储空间具有不同于代管式存储空间的可用性特征。BigQuery 中的某些操作不与写入优化存储空间交互,例如表复制作业和 tabledata.list
等 API 方法。最近流式插入的数据将不会出现在目标表或输出中。