创建 BigQuery 订阅

本文档介绍了如何创建 BigQuery 订阅。您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 创建 BigQuery 订阅。

准备工作

在阅读本文档之前,请确保您熟悉以下内容:

除了熟悉 Pub/Sub 和 BigQuery 之外,您还需要确保在创建 BigQuery 订阅之前满足以下前提条件:

  • 存在 BigQuery 表。或者,您也可以在创建 BigQuery 订阅时创建一个,如本文档后面的部分所述。

  • Pub/Sub 主题架构与 BigQuery 表架构之间的兼容性。如果您添加了不兼容的 BigQuery 表,则会收到与兼容性相关的错误消息。如需了解详情,请参阅架构兼容性

所需的角色和权限

以下是有关角色和权限的准则列表:

  • 如需创建订阅,您必须在项目级配置访问权限控制。

  • 如果您的订阅和主题位于不同的项目中,您还需要具备资源级权限,如本部分后面所述。

  • 若要创建 BigQuery 订阅,Pub/Sub 服务账号必须有权写入特定 BigQuery 表。如需详细了解如何授予这些权限,请参阅本文档的下一部分。

  • 您可以在一个项目中配置 BigQuery 订阅,以便将数据写入其他项目中的 BigQuery 表。

如需获得创建 BigQuery 订阅所需的权限,请让管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建 BigQuery 订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建 BigQuery 订阅,需要具备以下权限:

  • 从订阅中拉取: pubsub.subscriptions.consume
  • 创建订阅: pubsub.subscriptions.create
  • 删除订阅: pubsub.subscriptions.delete
  • 获取订阅: pubsub.subscriptions.get
  • 列出订阅: pubsub.subscriptions.list
  • 更新订阅: pubsub.subscriptions.update
  • 将订阅附加到主题: pubsub.topics.attachSubscription
  • 获取订阅的 IAM 政策: pubsub.subscriptions.getIamPolicy
  • 为订阅配置 IAM 政策 pubsub.subscriptions.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如果您需要在一个项目中创建与另一个项目中的主题关联的 BigQuery 订阅,请让主题管理员为您授予该主题的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。

向 Pub/Sub 服务账号分配 BigQuery 角色

某些 Google Cloud 服务具有 Google Cloud 代管式服务账号,可允许服务访问您的资源。这些服务账号称为“服务代理”。Pub/Sub 会为每个项目创建并维护一个服务账号,格式为 service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com

如需创建 BigQuery 订阅,Pub/Sub 服务账号必须有权写入特定 BigQuery 表并读取表元数据。

向 Pub/Sub 服务账号授予 BigQuery Data Editor (roles/bigquery.dataEditor) 角色。

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 点击授予访问权限

  3. 添加主账号部分,输入您的 Pub/Sub 服务账号的名称。服务账号的格式为 service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com。例如,对于项目 project-number=112233445566,服务账号的格式为 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com

  4. 分配角色部分中,点击添加其他角色

  5. 选择角色下拉菜单中,输入 BigQuery,然后选择 BigQuery Data Editor 角色

  6. 点击保存

如需详细了解 BigQuery IAM,请参阅 BigQuery 角色和权限

BigQuery 订阅属性

配置 BigQuery 订阅时,您可以指定以下属性。

通用属性

了解您可以在所有订阅中设置的常见订阅属性

使用主题架构

此选项可让 Pub/Sub 使用订阅附加到的 Pub/Sub 主题的架构。此外,Pub/Sub 会将消息中的字段写入 BigQuery 表中的相应列。

使用此选项时,请务必查看以下其他要求:

  • 主题架构和 BigQuery 架构中的字段必须具有相同的名称,并且它们的类型必须彼此兼容。

  • 主题架构中的任何可选字段在 BigQuery 架构中也必须是可选字段。

  • 主题架构中的必需字段在 BigQuery 架构中不必是必需字段。

  • 如果主题架构中不存在某些 BigQuery 字段,则这些 BigQuery 字段必须采用模式 NULLABLE

  • 如果主题架构中包含 BigQuery 架构中不存在的其他字段,并且这些字段可以被舍弃,请选择舍弃未知字段选项。

  • 您只能选择其中一个订阅属性,即使用主题架构使用表架构

如果您未选择使用主题架构使用表架构选项,请确保 BigQuery 表中有一个名为 data 且类型为 BYTESSTRINGJSON 的列。Pub/Sub 会将消息写入此 BigQuery 列。

您可能不会立即看到对 Pub/Sub 主题架构或 BigQuery 表架构所做的更改生效,因为消息会写入 BigQuery 表。例如,如果舍弃未知字段选项处于启用状态,并且 Pub/Sub 架构中存在某个字段,但 BigQuery 架构中不存在该字段,那么将消息写入 BigQuery 表后,该字段可能仍不会包含在 BigQuery 架构中。最终,架构会同步,后续消息会包含该字段。

为 BigQuery 订阅使用使用主题架构选项时,您还可以利用 BigQuery 变更数据捕获 (CDC)。CDC 通过处理更改并将其应用于现有行来更新 BigQuery 表。

如需详细了解此功能,请参阅使用变更数据捕获来流式插入表更新

如需了解如何将此功能与 BigQuery 订阅搭配使用,请参阅 BigQuery 变更数据捕获

使用表架构

通过此选项,Pub/Sub 可以使用 BigQuery 表的架构将 JSON 消息的字段写入相应的列。使用此选项时,请务必查看以下其他要求:

  • 发布的消息必须采用 JSON 格式。

  • 支持以下 JSON 转换:

    JSON 类型 BigQuery 数据类型
    string NUMERICBIGNUMERICDATETIMEDATETIMETIMESTAMP
    number NUMERICBIGNUMERICDATETIMEDATETIMETIMESTAMP
    • number 用于 DATEDATETIMETIMETIMESTAMP 转化时,该数字必须遵循受支持的表示法
    • 使用 number 转换为 NUMERICBIGNUMERIC 时,值的精度和范围仅限于 IEEE 754 浮点算术标准接受的值。如果您需要高精度或更大的值范围,请改用 stringNUMERICBIGNUMERIC 转换。
    • 使用 stringNUMERICBIGNUMERIC 转换时,Pub/Sub 会假定字符串是人类可读的数字(例如 "123.124")。如果将字符串作为人类可读的数字进行处理失败,Pub/Sub 会将字符串视为使用 BigDecimalByteStringEncoder 编码的字节。
  • 如果订阅的主题与架构相关联,则消息编码属性必须设置为 JSON

  • 如果消息中不存在某些 BigQuery 字段,则这些 BigQuery 字段必须采用模式 NULLABLE

  • 如果消息包含 BigQuery 架构中不存在的其他字段,并且可以丢弃这些字段,请选择丢弃未知字段选项。

  • 您只能选择其中一个订阅属性,即使用主题架构使用表架构

如果您未选择使用主题架构使用表架构选项,请确保 BigQuery 表中有一个名为 data 且类型为 BYTESSTRINGJSON 的列。Pub/Sub 会将消息写入此 BigQuery 列。

您可能不会立即看到对 BigQuery 表架构所做的更改生效,因为消息会写入 BigQuery 表。例如,如果舍弃未知字段选项处于启用状态,并且消息中存在某个字段,但该字段不存在于 BigQuery 架构中,那么将消息写入 BigQuery 表后,该字段可能仍不会包含在 BigQuery 架构中。最终,架构会同步,后续消息会包含该字段。

为 BigQuery 订阅使用使用表架构选项时,您还可以利用 BigQuery 变更数据捕获 (CDC)。CDC 通过处理更改并将其应用于现有行来更新 BigQuery 表。

如需详细了解此功能,请参阅使用变更数据捕获来流式插入表更新

如需了解如何将此功能与 BigQuery 订阅搭配使用,请参阅 BigQuery 变更数据捕获

删除未知字段

此选项与使用主题架构使用表架构选项搭配使用。通过此选项,Pub/Sub 会删除主题架构或消息中存在但 BigQuery 架构中不存在的任何字段。如果未设置丢弃未知字段,包含额外字段的消息不会写入 BigQuery,而是会保留在订阅积压消息中。订阅最终会处于错误状态

写入元数据

通过此选项,Pub/Sub 可以将每条消息的元数据写入 BigQuery 表中的其他列。否则,元数据不会写入 BigQuery 表。

如果您选择写入元数据选项,请确保 BigQuery 表包含下表中所述的字段。

如果您未选择写入元数据选项,则目标 BigQuery 表只需要 data 字段,除非 use_topic_schema 为 true。如果您同时选择了写入元数据使用主题架构选项,则主题的架构不得包含任何名称与元数据参数名称匹配的字段。此限制包括这些蛇形命名参数的驼峰命名版本。

参数
subscription_name

STRING

订阅的名称。

message_id

STRING

消息 ID

publish_time

TIMESTAMP

消息的发布时间。

data

BYTES、STRING 或 JSON

消息正文。

对于未选择使用主题架构使用表架构的所有目标 BigQuery 表,data 字段都是必需的。如果该字段的类型为 JSON,则消息正文必须为有效的 JSON。

attributes

STRING 或 JSON

一个包含所有消息属性的 JSON 对象。它还包含 Pub/Sub 消息中的其他字段,包括排序键(如果有)。

创建 BigQuery 订阅

以下示例演示了如何创建使用 BigQuery 传送的订阅。

控制台

  1. 在 Google Cloud 控制台中,前往订阅页面。

    前往“订阅”页面

  2. 点击创建订阅
  3. 订阅 ID 字段中,输入名称。

    如需了解如何命名订阅,请参阅主题或订阅命名指南

  4. 从下拉菜单中选择或创建一个主题。订阅将接收来自该主题的消息。
  5. 传送类型选择为写入 BigQuery
  6. 选择 BigQuery 表的项目。
  7. 选择现有数据集或创建新数据集。

    如需了解如何创建数据集,请参阅创建数据集

  8. 选择现有表格或创建新表格。

    如需了解如何创建表,请参阅创建表

  9. 我们强烈建议您启用死信以处理消息传送失败问题。

    如需了解详情,请参阅“死信主题”

  10. 点击创建

您还可以通过主题页面创建订阅。 此快捷方式可帮助您将主题与订阅关联。

  1. 在 Google Cloud 控制台中,前往主题页面。

    前往“主题”

  2. 点击要创建订阅的主题旁边的
  3. 从上下文菜单中,选择创建订阅
  4. 传送类型选择为写入 BigQuery
  5. 选择 BigQuery 表的项目。
  6. 选择现有数据集或创建新数据集。

    如需了解如何创建数据集,请参阅创建数据集

  7. 选择现有表格或创建新表格。

    如需了解如何创建数据集,请参阅创建表

  8. 我们强烈建议您启用死信以处理消息传送失败问题。

    如需了解详情,请参阅“死信主题”

  9. 点击创建

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 如需创建 Pub/Sub 订阅,请使用 gcloud pubsub subscriptions create 命令:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --bigquery-table=PROJECT_ID:DATASET_ID.TABLE_ID

    替换以下内容:

    • SUBSCRIPTION_ID:指定订阅的 ID。
    • TOPIC_ID:指定主题的 ID。主题需要架构。
    • PROJECT_ID:指定项目的 ID。
    • DATASET_ID:指定现有数据集的 ID。如需创建数据集,请参阅 创建数据集
    • TABLE_ID:指定现有表的 ID。 如果您的主题没有架构,则表需要包含 data 字段。如需创建表,请参阅使用架构定义创建空表

C++

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& table_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_bigquery_config()->set_table(table_id);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证


using Google.Cloud.PubSub.V1;

public class CreateBigQuerySubscriptionSample
{
    public Subscription CreateBigQuerySubscription(string projectId, string topicId, string subscriptionId, string bigqueryTableId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            BigqueryConfig = new BigQueryConfig
            {
                Table = bigqueryTableId
            }
        };
        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        return subscription;
    }
}

Go

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// createBigQuerySubscription creates a Pub/Sub subscription that exports messages to BigQuery.
func createBigQuerySubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, table string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// table := "my-project-id.dataset_id.table_id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic: topic,
		BigQueryConfig: pubsub.BigQueryConfig{
			Table:         table,
			WriteMetadata: true,
		},
	})
	if err != nil {
		return fmt.Errorf("client.CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created BigQuery subscription: %v\n", sub)

	return nil
}

Java

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.BigQueryConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateBigQuerySubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bigqueryTableId = "your-project.your-dataset.your-table";

    createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId);
  }

  public static void createBigQuerySubscription(
      String projectId, String topicId, String subscriptionId, String bigqueryTableId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      BigQueryConfig bigqueryConfig =
          BigQueryConfig.newBuilder().setTable(bigqueryTableId).setWriteMetadata(true).build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setBigqueryConfig(bigqueryConfig)
                  .build());

      System.out.println("Created a BigQuery subscription: " + subscription.getAllFields());
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId,
  subscriptionNameOrId,
  bigqueryTableId
) {
  const options = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  bigqueryTableId: string
) {
  const options: CreateSubscriptionOptions = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

PHP

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 PHP 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub PHP API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\BigQueryConfig;

/**
 * Creates a Pub/Sub BigQuery subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $table      The BigQuery table to which to write.
 */
function create_bigquery_subscription($projectId, $topicName, $subscriptionName, $table)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $config = new BigQueryConfig(['table' => $table]);
    $subscription->create([
        'bigqueryConfig' => $config
    ]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
}

Python

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "your-project.your-dataset.your-table"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(
    table=bigquery_table_id, write_metadata=True
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")

Ruby

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

require "google/cloud/pubsub"

##
# Shows how to create a BigQuery subscription where messages published
# to a topic populates a BigQuery table.
#
# @param project_id [String]
# Your Google Cloud project (e.g. "my-project")
# @param topic_id [String]
# Your topic name (e.g. "my-secret")
# @param subscription_id [String]
# ID for new subscription to be created (e.g. "my-subscription")
# @param bigquery_table_id [String]
# ID of bigquery table (e.g "my-project:dataset-id.table-id")
#
def pubsub_create_bigquery_subscription project_id:, topic_id:, subscription_id:, bigquery_table_id:
  pubsub = Google::Cloud::Pubsub.new project_id: project_id
  topic = pubsub.topic topic_id
  subscription = topic.subscribe subscription_id,
                                 bigquery_config: {
                                   table: bigquery_table_id,
                                   write_metadata: true
                                 }
  puts "BigQuery subscription created: #{subscription_id}."
  puts "Table for subscription is: #{bigquery_table_id}"
end

监控 BigQuery 订阅

Cloud Monitoring 提供了一些指标来监控订阅

如需查看与 Pub/Sub 相关的所有可用指标及其说明的列表,请参阅 Pub/Sub 监控文档

您还可以在 Pub/Sub 中监控订阅。

后续步骤