创建持续查询

本文档介绍了如何在 BigQuery 中运行持续查询

BigQuery 持续查询是指连续运行的 SQL 语句。借助持续查询,您可以实时分析 BigQuery 中的传入数据,然后将结果导出到 Bigtable 或 Pub/Sub,或将结果写入 BigQuery 表。

选择账号类型

您可以使用用户账号创建和运行持续查询作业,也可以使用用户账号创建持续查询作业,然后使用服务账号运行该作业。您必须使用服务账号运行将结果导出到 Pub/Sub 主题的持续查询。

使用用户账号时,持续查询的运行时长为两天。使用服务账号时,持续查询会一直运行,直到明确取消为止。如需了解详情,请参阅授权

所需权限

本部分介绍创建和运行持续查询所需的权限。除了上述 Identity and Access Management (IAM) 角色之外,您还可以通过自定义角色来获取所需的权限。

使用用户账号时的权限

本部分介绍了使用用户账号创建和运行持续查询所需的角色和权限。

如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create IAM 权限。以下每个 IAM 角色都授予 bigquery.jobs.create 权限:

如需从 BigQuery 表导出数据,用户账号必须具有 bigquery.tables.export IAM 权限。以下每个 IAM 角色均会授予 bigquery.tables.export 权限:

如需更新 BigQuery 表中的数据,用户账号必须具有 bigquery.tables.updateData IAM 权限。以下每个 IAM 角色均会授予 bigquery.tables.updateData 权限:

如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin) 角色。

使用服务账号时的权限

本部分介绍了创建持续查询的用户账号和运行持续查询的服务账号所需的角色和权限。

用户账号权限

如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create IAM 权限。以下每个 IAM 角色均会授予 bigquery.jobs.create 权限:

如需提交使用服务账号运行的作业,用户账号必须具有 Service Account User (roles/iam.serviceAccountUser) 角色。如果您使用同一用户账号创建服务账号,则该用户账号必须具有 Service Account Admin (roles/iam.serviceAccountAdmin) 角色。如需了解如何限制用户对单个服务账号(而非项目中的所有服务账号)的访问权限,请参阅授予单个角色

如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin) 角色。

服务账号权限

如需从 BigQuery 表导出数据,服务账号必须拥有 bigquery.tables.export IAM 权限。以下每个 IAM 角色均会授予 bigquery.tables.export 权限:

如需更新 BigQuery 表中的数据,服务账号必须具有 bigquery.tables.updateData IAM 权限。以下每个 IAM 角色均会授予 bigquery.tables.updateData 权限:

准备工作

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

创建预留

创建企业版或企业 Plus 版预留,然后创建使用 CONTINUOUS 作业类型的预留分配

为持续查询创建预留分配时,关联的预留不得超过 500 个槽,并且无法配置为使用自动扩缩

导出到 Pub/Sub

如需将数据导出到 Pub/Sub,您需要额外的 API、IAM 权限和 Google Cloud 资源。如需了解详情,请参阅导出到 Pub/Sub

将自定义属性作为元数据嵌入到 Pub/Sub 消息中

您可以使用 Pub/Sub 属性提供有关消息的附加信息,例如其优先级、来源、目的地或其他元数据。您还可以使用属性过滤订阅中的消息

在持续查询结果中,如果某个列的名称为 _ATTRIBUTES,则其值会复制到 Pub/Sub 消息属性。_ATTRIBUTES 中提供的字段用作属性键。

_ATTRIBUTES 列必须为 JSON 类型,格式为 ARRAY<STRUCT<STRING, STRING>>STRUCT<STRING>

如需查看示例,请参阅将数据导出到 Pub/Sub 主题

导出到 Bigtable

如需将数据导出到 Bigtable,您需要具备其他 API、IAM 权限和 Google Cloud 资源。如需了解详情,请参阅导出到 Bigtable

将数据写入 BigQuery 表

您可以使用 INSERT 语句将数据写入 BigQuery 表。

使用 AI 函数

若要在持续查询中使用受支持的 AI 函数,您需要具备额外的 API、IAM 权限和 Google Cloud 资源。如需了解详情,请根据您的应用场景参阅以下某个主题:

在持续查询中使用 AI 函数时,请考虑查询输出是否会超出该函数的配额。如果超出配额,您可能需要单独处理未处理的记录。

使用用户账号运行持续查询

本部分介绍了如何使用用户账号运行持续查询。持续查询运行后,您可以关闭 Google Cloud 控制台、终端窗口或应用,而不会中断查询执行。

如需运行持续查询,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,点击更多

  3. 选择查询模式部分,选择持续查询

  4. 点击确认

  5. 在查询编辑器中,输入持续查询的 SQL 语句。SQL 语句只能包含支持的操作

  6. 点击运行

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 在 Cloud Shell 中,使用带有 --continuous 标志的 bq query 命令运行持续查询:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    QUERY 替换为持续查询的 SQL 语句。SQL 语句只能包含支持的操作

API

通过调用 jobs.insert 方法运行持续查询。您必须在传入的 Job 资源JobConfigurationQuery 中将 continuous 字段设置为 true

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

替换以下内容:

  • PROJECT_ID:您的项目 ID。
  • QUERY:持续查询的 SQL 语句。SQL 语句只能包含支持的操作

使用服务账号运行持续查询

本部分介绍了如何使用服务账号运行持续查询。持续查询运行后,您可以关闭 Google Cloud 控制台、终端窗口或应用,而不会中断查询执行。

如需使用服务账号运行持续查询,请按以下步骤操作:

控制台

  1. 创建服务账号
  2. 向服务账号授予所需权限
  3. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  4. 在查询编辑器中,点击更多

  5. 选择查询模式部分,选择持续查询

  6. 点击确认

  7. 在查询编辑器中,点击更多 > 查询设置

  8. 持续查询部分中,使用服务账号框选择您创建的服务账号。

  9. 点击保存

  10. 在查询编辑器中,输入持续查询的 SQL 语句。SQL 语句只能包含支持的操作

  11. 点击运行

bq

  1. 创建服务账号
  2. 向服务账号授予所需权限
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. 在命令行上,使用带有以下标志的 bq query 命令运行持续查询:

    • --continuous 标志设置为 true 以使查询持续。
    • 使用 --connection_property 标志指定要使用的服务账号。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • SERVICE_ACCOUNT_EMAIL:服务账号电子邮件地址。您可以在 Google Cloud 控制台的服务账号页面上获取服务账号电子邮件地址。
    • QUERY:持续查询的 SQL 语句。SQL 语句只能包含支持的操作

API

  1. 创建服务账号
  2. 向服务账号授予所需权限
  3. 通过调用 jobs.insert 方法运行持续查询。在传入的 Job 资源JobConfigurationQuery 资源中设置以下字段:

    • continuous 字段设置为 true 以使查询持续。
    • 使用 connection_property 字段指定要使用的服务账号。
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • QUERY:持续查询的 SQL 语句。SQL 语句只能包含支持的操作
    • SERVICE_ACCOUNT_EMAIL:服务账号电子邮件地址。您可以在 Google Cloud 控制台的服务账号页面上获取服务账号电子邮件地址。

示例

以下 SQL 示例展示了持续查询的常见用例。

将数据导出到 Pub/Sub 主题

以下示例展示了一个持续查询,该查询会从接收流式出租车行程信息的 BigQuery 表中过滤数据,并将数据附带消息属性实时发布到 Pub/Sub 主题:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

将数据导出到 Bigtable 表

以下示例展示了一个持续查询,该查询会从接收流式出租车行程信息的 BigQuery 表中过滤数据,并将数据实时导出到 Bigtable 表中:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

将数据写入 BigQuery 表

以下示例展示了一个持续查询,该查询会过滤和转换接收流式出租车行程信息的 BigQuery 表中的数据,然后将数据实时写入另一个 BigQuery 表。这样一来,数据就可以供进一步的下游分析使用。

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

使用 Vertex AI 模型处理数据

以下示例展示了一个持续查询,该查询使用 Vertex AI 模型根据出租车乘客的当前经纬度生成广告,然后将结果实时导出到 Pub/Sub 主题:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

从特定时间点开始持续查询

当您启动持续查询时,系统会处理您要从中选择表中的所有行,然后处理新行。如果您想跳过处理部分或全部现有数据,可以使用 APPENDS 更改历史记录函数从特定时间点开始处理。

以下示例展示了如何使用 APPENDS 函数从特定时间点开始持续查询:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

修改持续查询的 SQL

在持续查询作业运行期间,您无法更新持续查询中使用的 SQL。您必须取消持续查询作业、修改 SQL,然后从停止原始持续查询作业的时间点开始启动新的持续查询作业。

如需修改持续查询中使用的 SQL,请按以下步骤操作:

  1. 查看要更新的持续查询作业的作业详情,并记下作业 ID。
  2. 如果可能,请暂停收集上游数据。如果您无法执行此操作,则在重新启动持续查询时可能会出现一些数据重复。
  3. 取消要修改的持续查询
  4. 使用 INFORMATION_SCHEMA JOBS 视图获取原始持续查询作业的 end_time 值:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • REGION:您的项目使用的区域。
    • JOB_ID:您在第 1 步中标识的持续查询作业 ID。
  5. 修改持续查询 SQL 语句,以从特定时间点开始持续查询,并使用您在第 5 步中检索到的 end_time 值作为起点。

  6. 修改持续查询 SQL 语句,以反映您需要的更改。

  7. 运行修改后的持续查询。

取消持续查询

您可以像取消任何其他作业一样取消持续查询作业。取消作业后,查询最长可能需要一分钟才能停止运行。

后续步骤