创建持续查询
本文档介绍了如何在 BigQuery 中运行持续查询。
BigQuery 持续查询是指连续运行的 SQL 语句。借助持续查询,您可以实时分析 BigQuery 中的传入数据,然后将结果导出到 Bigtable 或 Pub/Sub,或将结果写入 BigQuery 表。
选择账号类型
您可以使用用户账号创建和运行持续查询作业,也可以使用用户账号创建持续查询作业,然后使用服务账号运行该作业。您必须使用服务账号运行将结果导出到 Pub/Sub 主题的持续查询。
使用用户账号时,持续查询的运行时长为两天。使用服务账号时,持续查询会一直运行,直到明确取消为止。如需了解详情,请参阅授权。
所需权限
本部分介绍创建和运行持续查询所需的权限。除了上述 Identity and Access Management (IAM) 角色之外,您还可以通过自定义角色来获取所需的权限。
使用用户账号时的权限
本部分介绍了使用用户账号创建和运行持续查询所需的角色和权限。
如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create
IAM 权限。以下每个 IAM 角色都授予 bigquery.jobs.create
权限:
- BigQuery User (
roles/bigquery.user
) - BigQuery Job User (
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
如需从 BigQuery 表导出数据,用户账号必须具有 bigquery.tables.export
IAM 权限。以下每个 IAM 角色均会授予 bigquery.tables.export
权限:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
如需更新 BigQuery 表中的数据,用户账号必须具有 bigquery.tables.updateData
IAM 权限。以下每个 IAM 角色均会授予 bigquery.tables.updateData
权限:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin
) 角色。
使用服务账号时的权限
本部分介绍了创建持续查询的用户账号和运行持续查询的服务账号所需的角色和权限。
用户账号权限
如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create
IAM 权限。以下每个 IAM 角色均会授予 bigquery.jobs.create
权限:
- BigQuery User (
roles/bigquery.user
) - BigQuery Job User (
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
如需提交使用服务账号运行的作业,用户账号必须具有 Service Account User (roles/iam.serviceAccountUser
) 角色。如果您使用同一用户账号创建服务账号,则该用户账号必须具有 Service Account Admin (roles/iam.serviceAccountAdmin
) 角色。如需了解如何限制用户对单个服务账号(而非项目中的所有服务账号)的访问权限,请参阅授予单个角色。
如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin
) 角色。
服务账号权限
如需从 BigQuery 表导出数据,服务账号必须拥有 bigquery.tables.export
IAM 权限。以下每个 IAM 角色均会授予 bigquery.tables.export
权限:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
bigquery.tables.updateData
IAM 权限。以下每个 IAM 角色均会授予 bigquery.tables.updateData
权限:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
准备工作
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
创建预留
创建企业版或企业 Plus 版预留,然后创建使用 CONTINUOUS
作业类型的预留分配。
为持续查询创建预留分配时,关联的预留不得超过 500 个槽,并且无法配置为使用自动扩缩。
导出到 Pub/Sub
如需将数据导出到 Pub/Sub,您需要额外的 API、IAM 权限和 Google Cloud 资源。如需了解详情,请参阅导出到 Pub/Sub。
将自定义属性作为元数据嵌入到 Pub/Sub 消息中
您可以使用 Pub/Sub 属性提供有关消息的附加信息,例如其优先级、来源、目的地或其他元数据。您还可以使用属性过滤订阅中的消息。
在持续查询结果中,如果某个列的名称为 _ATTRIBUTES
,则其值会复制到 Pub/Sub 消息属性。_ATTRIBUTES
中提供的字段用作属性键。
_ATTRIBUTES
列必须为 JSON
类型,格式为 ARRAY<STRUCT<STRING, STRING>>
或 STRUCT<STRING>
。
如需查看示例,请参阅将数据导出到 Pub/Sub 主题。
导出到 Bigtable
如需将数据导出到 Bigtable,您需要具备其他 API、IAM 权限和 Google Cloud 资源。如需了解详情,请参阅导出到 Bigtable。
将数据写入 BigQuery 表
您可以使用 INSERT
语句将数据写入 BigQuery 表。
使用 AI 函数
若要在持续查询中使用受支持的 AI 函数,您需要具备额外的 API、IAM 权限和 Google Cloud 资源。如需了解详情,请根据您的应用场景参阅以下某个主题:
- 使用
ML.GENERATE_TEXT
函数生成文本 - 使用
ML.GENERATE_EMBEDDING
函数生成文本嵌入 - 使用
ML.UNDERSTAND_TEXT
函数理解文本 - 使用
ML.TRANSLATE
函数翻译文本
在持续查询中使用 AI 函数时,请考虑查询输出是否会超出该函数的配额。如果超出配额,您可能需要单独处理未处理的记录。
使用用户账号运行持续查询
本部分介绍了如何使用用户账号运行持续查询。持续查询运行后,您可以关闭 Google Cloud 控制台、终端窗口或应用,而不会中断查询执行。
如需运行持续查询,请按以下步骤操作:
控制台
在 Google Cloud 控制台中,转到 BigQuery 页面。
在查询编辑器中,点击更多。
在选择查询模式部分,选择持续查询。
点击确认。
在查询编辑器中,输入持续查询的 SQL 语句。SQL 语句只能包含支持的操作。
点击运行。
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
在 Cloud Shell 中,使用带有
--continuous
标志的bq query
命令运行持续查询:bq query --use_legacy_sql=false --continuous=true 'QUERY'
将
QUERY
替换为持续查询的 SQL 语句。SQL 语句只能包含支持的操作。
API
通过调用 jobs.insert
方法运行持续查询。您必须在传入的 Job
资源的 JobConfigurationQuery
中将 continuous
字段设置为 true
。
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
替换以下内容:
PROJECT_ID
:您的项目 ID。QUERY
:持续查询的 SQL 语句。SQL 语句只能包含支持的操作。
使用服务账号运行持续查询
本部分介绍了如何使用服务账号运行持续查询。持续查询运行后,您可以关闭 Google Cloud 控制台、终端窗口或应用,而不会中断查询执行。
如需使用服务账号运行持续查询,请按以下步骤操作:
控制台
bq
- 创建服务账号。
- 向服务账号授予所需权限。
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
在命令行上,使用带有以下标志的
bq query
命令运行持续查询:- 将
--continuous
标志设置为true
以使查询持续。 - 使用
--connection_property
标志指定要使用的服务账号。
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
替换以下内容:
- 将
API
- 创建服务账号。
- 向服务账号授予所需权限。
通过调用
jobs.insert
方法运行持续查询。在传入的Job
资源的JobConfigurationQuery
资源中设置以下字段:- 将
continuous
字段设置为true
以使查询持续。 - 使用
connection_property
字段指定要使用的服务账号。
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
替换以下内容:
- 将
示例
以下 SQL 示例展示了持续查询的常见用例。
将数据导出到 Pub/Sub 主题
以下示例展示了一个持续查询,该查询会从接收流式出租车行程信息的 BigQuery 表中过滤数据,并将数据附带消息属性实时发布到 Pub/Sub 主题:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
将数据导出到 Bigtable 表
以下示例展示了一个持续查询,该查询会从接收流式出租车行程信息的 BigQuery 表中过滤数据,并将数据实时导出到 Bigtable 表中:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
将数据写入 BigQuery 表
以下示例展示了一个持续查询,该查询会过滤和转换接收流式出租车行程信息的 BigQuery 表中的数据,然后将数据实时写入另一个 BigQuery 表。这样一来,数据就可以供进一步的下游分析使用。
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
使用 Vertex AI 模型处理数据
以下示例展示了一个持续查询,该查询使用 Vertex AI 模型根据出租车乘客的当前经纬度生成广告,然后将结果实时导出到 Pub/Sub 主题:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
从特定时间点开始持续查询
当您启动持续查询时,系统会处理您要从中选择表中的所有行,然后处理新行。如果您想跳过处理部分或全部现有数据,可以使用 APPENDS
更改历史记录函数从特定时间点开始处理。
以下示例展示了如何使用 APPENDS
函数从特定时间点开始持续查询:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
修改持续查询的 SQL
在持续查询作业运行期间,您无法更新持续查询中使用的 SQL。您必须取消持续查询作业、修改 SQL,然后从停止原始持续查询作业的时间点开始启动新的持续查询作业。
如需修改持续查询中使用的 SQL,请按以下步骤操作:
- 查看要更新的持续查询作业的作业详情,并记下作业 ID。
- 如果可能,请暂停收集上游数据。如果您无法执行此操作,则在重新启动持续查询时可能会出现一些数据重复。
- 取消要修改的持续查询。
使用
INFORMATION_SCHEMA
JOBS
视图获取原始持续查询作业的end_time
值:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
替换以下内容:
PROJECT_ID
:您的项目 ID。REGION
:您的项目使用的区域。JOB_ID
:您在第 1 步中标识的持续查询作业 ID。
修改持续查询 SQL 语句,以从特定时间点开始持续查询,并使用您在第 5 步中检索到的
end_time
值作为起点。修改持续查询 SQL 语句,以反映您需要的更改。
运行修改后的持续查询。
取消持续查询
您可以像取消任何其他作业一样取消持续查询作业。取消作业后,查询最长可能需要一分钟才能停止运行。