从 Cloud Storage 加载 Parquet 数据

本页面概述了如何将 Cloud Storage 中的 Parquet 数据加载到 BigQuery 中。

Parquet 是一种面向列的开源数据格式,在 Apache Hadoop 生态系统中得到广泛运用。

从 Cloud Storage 加载 Parquet 数据时,可以将数据加载到新的表或分区中,也可以将其附加到或覆盖现有的表或分区。数据加载到 BigQuery 后,系统会将其转换为适用于 Capacitor 的列式格式(BigQuery 的存储格式)。

如需将 Cloud Storage 中的数据加载到 BigQuery 表,则包含该表的数据集必须与相应 Cloud Storage 存储桶位于同一区域或多区域位置。

如需了解如何从本地文件加载 Parquet 数据,请参阅从本地文件加载数据

限制

将数据从 Cloud Storage 存储桶加载到 BigQuery 时,需要遵循以下限制:

  • 如果您的数据集位置设置为 US 多区域以外的值,则 Cloud Storage 存储桶必须与数据集位于同一单区域中或包含在同一多区域内。
  • BigQuery 不保证外部数据源的数据一致性。在查询运行的过程中,底层数据的更改可能会导致意外行为。
  • BigQuery 不支持 Cloud Storage 对象版本控制。如果您在 Cloud Storage URI 中添加了世代编号,则加载作业将失败。

  • 加载 Parquet 数据时,需要遵循列命名惯例,默认情况下不支持灵活的列名称。如需注册此预览版,请填写注册表单

  • 如果要加载的任何文件具有不同的架构,则无法在 Cloud Storage URI 中使用通配符。列位置的任何差异都属于不同的架构。

输入文件要求

将 Parquet 文件加载到 BigQuery 时,请遵循以下准则,以避免 resourcesExceeded 错误:

  • 行大小不得超过 50 MB。
  • 如果输入数据包含的列数超过 100 个,请考虑将页面大小减小到低于默认页面大小(1 * 1024 * 1024 个字节)。如果您使用的是大量压缩,这样做就特别有用。
  • 为获得最佳性能,请将行组大小设为至少 16 MiB。 行组大小越小,I/O 就越多,加载和查询速度就越慢。

准备工作

授予为用户提供执行本文档中的每个任务所需权限的 Identity and Access Management (IAM) 角色,并创建一个数据集来存储您的数据。

所需权限

如需将数据加载到 BigQuery,您需要拥有 IAM 权限才能运行加载作业以及将数据加载到 BigQuery 表和分区中。如果要从 Cloud Storage 加载数据,您还需要拥有访问包含数据的存储桶的 IAM 权限。

将数据加载到 BigQuery 的权限

如需将数据加载到新的 BigQuery 表或分区中,或者附加或覆盖现有的表或分区,您需要拥有以下 IAM 权限:

  • bigquery.tables.create
  • bigquery.tables.updateData
  • bigquery.tables.update
  • bigquery.jobs.create

以下预定义 IAM 角色都具有将数据加载到 BigQuery 表或分区所需的权限:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin(包括 bigquery.jobs.create 权限)
  • bigquery.user(包括 bigquery.jobs.create 权限)
  • bigquery.jobUser(包括 bigquery.jobs.create 权限)

此外,如果您拥有 bigquery.datasets.create 权限,则可以在自己创建的数据集中使用加载作业创建和更新表。

如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅预定义的角色和权限

从 Cloud Storage 加载数据的权限

如需获得从 Cloud Storage 存储桶加载数据所需的权限,请让您的管理员为您授予存储桶的 Storage Admin (roles/storage.admin) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色可提供从 Cloud Storage 存储桶加载数据所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需从 Cloud Storage 存储桶加载数据,您需要具备以下权限:

  • storage.buckets.get
  • storage.objects.get
  • storage.objects.list (required if you are using a URI wildcard)

您也可以使用自定义角色或其他预定义角色来获取这些权限。

创建数据集

创建 BigQuery 数据集来存储数据。

Parquet 架构

在您将 Parquet 文件加载到 BigQuery 时,系统会自动从自描述源数据中检索表架构。BigQuery 从源数据中检索架构时,将使用按字母顺序显示的最后一个文件。

例如,您在 Cloud Storage 中具有以下 Parquet 文件:

gs://mybucket/00/
  a.parquet
  z.parquet
gs://mybucket/01/
  b.parquet

在 bq 命令行工具中运行此命令时,系统会加载所有文件(以英文逗号分隔列表形式),而架构来源于 mybucket/01/b.parquet

bq load \
--source_format=PARQUET \
dataset.table \
"gs://mybucket/00/*.parquet","gs://mybucket/01/*.parquet"

加载具有不同架构的多个 Parquet 文件时,多个架构中指定的相同列必须在每个架构定义中拥有相同的模式

当 BigQuery 检测架构时,会将某些 Parquet 数据类型转换为 BigQuery 数据类型,使其与 GoogleSQL 语法兼容。如需了解详情,请参阅 Parquet 转换

如需提供用于创建外部表的表架构,请将 BigQuery API 中的 referenceFileSchemaUri 属性或 bq 命令行工具中的
--reference_file_schema_uri 参数设置为参考文件的网址。

例如 --reference_file_schema_uri="gs://mybucket/schema.parquet"

Parquet 压缩

BigQuery 支持对 Parquet 文件内容使用以下压缩编解码器:

  • GZip
  • LZO_1C
  • LZO_1X
  • LZ4_RAW
  • Snappy
  • ZSTD

将 Parquet 数据加载到新表

您可以使用以下方式之一将 Parquet 数据加载到新表中:

  • Google Cloud 控制台
  • bq 命令行工具的 bq load 命令
  • 调用 jobs.insert API 方法并配置 load 作业
  • 客户端库

如需将 Parquet 数据从 Cloud Storage 加载到新的 BigQuery 表中,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 浏览器窗格中,展开您的项目,然后选择数据集。
  3. 数据集信息部分中,点击 创建表
  4. 创建表面板中,指定以下详细信息:
    1. 来源部分中,从基于以下数据源创建表列表中选择 Google Cloud Storage。之后,执行以下操作:
      1. 从 Cloud Storage 存储桶中选择一个文件,或输入 Cloud Storage URI。您无法在 Google Cloud 控制台中添加多个 URI,但支持使用通配符。Cloud Storage 存储桶必须与您要创建、附加或覆盖的表所属的数据集位于同一位置。 选择源文件以创建 BigQuery 表
      2. 文件格式部分,选择 Parquet
    2. 目标部分,指定以下详细信息:
      1. 数据集部分,选择您要在其中创建表的数据集。
      2. 字段中,输入您要创建的表的名称。
      3. 确认表类型字段是否设置为原生表
    3. 架构部分中,无需执行任何操作。架构在 Parquet 文件中为自描述形式。
    4. 可选:指定分区和聚簇设置。如需了解详情,请参阅创建分区表创建和使用聚簇表
    5. 点击高级选项,然后执行以下操作:
      • 写入偏好设置部分,选中只写入空白表。此选项创建一个新表并向其中加载数据。
      • 如果要忽略表架构中不存在的行中的值,请选择未知值
      • 加密部分,点击客户管理的密钥,以使用 Cloud Key Management Service 密钥。如果保留 Google 管理的密钥设置,BigQuery 将对静态数据进行加密
    6. 点击创建表

SQL

使用 LOAD DATA DDL 语句. 以下示例将 Parquet 文件加载到新表 mytable 中:

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,输入以下语句:

    LOAD DATA OVERWRITE mydataset.mytable
    FROM FILES (
      format = 'PARQUET',
      uris = ['gs://bucket/path/file.parquet']);

  3. 点击 运行

如需详细了解如何运行查询,请参阅运行交互式查询

bq

使用 bq load 命令,通过 --source_format 标志指定 PARQUET,并添加 Cloud Storage URI。您可以添加单个 URI、以英文逗号分隔的 URI 列表或含有通配符的 URI。

(可选)提供 --location 标志并将其值设置为您的位置

其他可选标志包括:

  • --time_partitioning_type:此标志会在表上启用基于时间的分区,并设置分区类型。可能的值包括 HOURDAYMONTHYEAR。当您创建按 DATEDATETIMETIMESTAMP 列分区的表时,可选用此标志。基于时间的分区的默认分区类型为 DAY。 您无法更改现有表上的分区规范。
  • --time_partitioning_expiration:此标志值为一个整数,指定了应在何时删除基于时间的分区(以秒为单位)。到期时间以分区的世界协调时间 (UTC) 日期加上这个整数值为准。
  • --time_partitioning_field:此标志表示用于创建分区表的 DATETIMESTAMP 列。如果在未提供此值的情况下启用了基于时间的分区,系统会创建注入时间分区表。
  • --require_partition_filter:启用后,此选项会要求用户添加 WHERE 子句来指定要查询的分区。要求使用分区过滤条件可以减少费用并提高性能。如需了解详情,请参阅在查询中要求使用分区过滤条件
  • --clustering_fields:此标志表示以英文逗号分隔的列名称列表(最多包含 4 个列名称),用于创建聚簇表
  • --destination_kms_key:用于加密表数据的 Cloud KMS 密钥。
  • --column_name_character_map:定义列名称中的字符的范围和处理方式,并可选择启用灵活的列名称。如需了解详情,请参阅 load_option_list

    如需详细了解分区表,请参阅:

    如需详细了解聚簇表,请参阅:

    如需详细了解表加密,请参阅以下部分:

如需将 Parquet 数据加载到 BigQuery,请输入以下命令:

bq --location=LOCATION load \
--source_format=FORMAT \
DATASET.TABLE \
PATH_TO_SOURCE

替换以下内容:

  • LOCATION:您所在的位置。--location 是可选标志。例如,如果您在东京区域使用 BigQuery,可将该标志的值设置为 asia-northeast1。您可以使用 .bigqueryrc 文件设置位置的默认值。
  • FORMATPARQUET
  • DATASET:现有数据集。
  • TABLE:要向其中加载数据的表的名称。
  • PATH_TO_SOURCE 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。系统也支持使用通配符

示例:

以下命令将 gs://mybucket/mydata.parquet 中的数据加载到 mydataset 中名为 mytable 的表中。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

以下命令将 gs://mybucket/mydata.parquet 中的数据加载到 mydataset 中名为 mytable 的新注入时间分区表中。

    bq load \
    --source_format=PARQUET \
    --time_partitioning_type=DAY \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

以下命令将 gs://mybucket/mydata.parquet 中的数据加载到 mydataset 中名为 mytable 的分区表中。该表按 mytimestamp 列进行分区。

    bq load \
    --source_format=PARQUET \
    --time_partitioning_field mytimestamp \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

以下命令将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。Cloud Storage URI 使用通配符。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata*.parquet

以下命令将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。该命令包含以英文逗号分隔的 Cloud Storage URI 列表(含通配符)。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    "gs://mybucket/00/*.parquet","gs://mybucket/01/*.parquet"

API

  1. 创建指向 Cloud Storage 中源数据的 load 作业。

  2. (可选)在作业资源 jobReference 部分的 location 属性中指定您的位置

  3. source URIs 属性必须是完全限定的,格式为 gs://BUCKET/OBJECT。每个 URI 都可以包含一个“*”通配符

  4. sourceFormat 属性设置为 PARQUET,以指定 Parquet 数据格式。

  5. 如需检查作业状态,请调用 jobs.get(JOB_ID*),并将 JOB_ID 替换为初始请求返回的作业的 ID。

    • 如果 status.state = DONE,则表示作业已成功完成。
    • 如果出现 status.errorResult 属性,则表示请求失败,并且该对象将包含描述问题的相关信息。如果请求失败,则不创建任何表且不加载任何数据。
    • 如果未出现 status.errorResult,则表示作业已成功完成,但可能存在一些非严重错误,如导入一些行时出错。非严重错误会列在返回的作业对象的 status.errors 属性中。

API 说明

  • 加载作业兼具原子性和一致性:如果加载作业失败,则所有数据都不可用;如果加载作业成功,则所有数据都可用。

  • 通过调用 jobs.insert 来创建加载作业时,最佳做法是生成唯一 ID,并将其作为 jobReference.jobId 传递。此方法受网络故障影响较小,因为客户端可以对已知的作业 ID 进行轮询或重试。

  • 对指定的作业 ID 调用 jobs.insert 具有幂等性。您可以对同一作业 ID 进行无限次重试,但最多只会有一个成功操作。

Go

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 BigQuery Go API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importParquet demonstrates loading Apache Parquet data from Cloud Storage into a table.
func importParquet(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.parquet")
	gcsRef.SourceFormat = bigquery.Parquet
	gcsRef.AutoDetect = true
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import java.math.BigInteger;

public class LoadParquet {

  public static void runLoadParquet() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    loadParquet(datasetName);
  }

  public static void loadParquet(String datasetName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet";
      TableId tableId = TableId.of(datasetName, "us_states");

      LoadJobConfiguration configuration =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.parquet())
              .build();

      // For more information on Job see:
      // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
      // Load the table
      Job job = bigquery.create(JobInfo.of(configuration));

      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      // Check number of rows loaded into the table
      BigInteger numRows = bigquery.getTable(tableId).getNumRows();
      System.out.printf("Loaded %d rows. \n", numRows);

      System.out.println("GCS parquet loaded successfully.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("GCS Parquet was not loaded. \n" + e.toString());
    }
  }
}

Node.js

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 BigQuery Node.js API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the Parquet file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.parquet
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.parquet';

async function loadTableGCSParquet() {
  // Imports a GCS file into a table with Parquet source format.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'PARQUET',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

PHP

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 PHP 设置说明进行操作。 如需了解详情,请参阅 BigQuery PHP API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId  = 'The Google project ID';
// $datasetId  = 'The BigQuery dataset ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table('us_states');

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet';
$loadConfig = $table->loadFromStorage($gcsUri)->sourceFormat('PARQUET');
$job = $table->runJob($loadConfig);
// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});
// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

使用 Client.load_table_from_uri() 方法启动从 Cloud Storage 加载数据的作业。如需使用 Parquet,请将 LoadJobConfig.source_format 属性设置为字符串 PARQUET,并将作业配置作为 job_config 参数传递给 load_table_from_uri() 方法。
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

使用 Parquet 数据覆盖或附加到表

您可以通过添加来自源文件的数据或附加查询结果,将其他数据加载到表中。

在 Google Cloud 控制台中,使用写入偏好设置选项指定从源文件或查询结果加载数据时要执行的操作。

将其他数据加载到表中时,可选择以下选项:

控制台选项 bq 工具标志 BigQuery API 属性 说明
只写入空白表 不支持 WRITE_EMPTY 仅当表为空时才写入数据。
Append to table --noreplace--replace=false;如果未指定 --[no]replace,则默认为附加 WRITE_APPEND 默认)在表末尾附加数据。
覆盖表 --replace--replace=true WRITE_TRUNCATE 清空表中所有现有数据然后再写入新数据。 此操作还会删除表架构和行级安全性,并移除所有 Cloud KMS 密钥。

如果将数据加载到现有表中,加载作业可以附加数据或覆盖表。

您可以使用以下方式之一对表执行附加或覆盖操作:

  • Google Cloud 控制台
  • bq 命令行工具的 bq load 命令
  • 调用 jobs.insert API 方法并配置 load 作业
  • 客户端库

如需使用 Parquet 数据附加数据或覆盖表,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 浏览器窗格中,展开您的项目,然后选择数据集。
  3. 数据集信息部分中,点击 创建表
  4. 创建表面板中,指定以下详细信息:
    1. 来源部分中,从基于以下数据源创建表列表中选择 Google Cloud Storage。之后,执行以下操作:
      1. 从 Cloud Storage 存储桶中选择一个文件,或输入 Cloud Storage URI。您无法在 Google Cloud 控制台中添加多个 URI,但支持使用通配符。Cloud Storage 存储桶必须与您要创建、附加或覆盖的表所属的数据集位于同一位置。 选择源文件以创建 BigQuery 表
      2. 文件格式部分,选择 Parquet
    2. 目标部分,指定以下详细信息:
      1. 数据集部分,选择您要在其中创建表的数据集。
      2. 字段中,输入您要创建的表的名称。
      3. 确认表类型字段是否设置为原生表
    3. 架构部分中,无需执行任何操作。架构在 Parquet 文件中为自描述形式。
    4. 可选:指定分区和聚簇设置。如需了解详情,请参阅创建分区表创建和使用聚簇表。 您无法通过附加或覆盖表将表转换为分区表或聚簇表。Google Cloud 控制台不支持在加载作业中对分区表或聚簇表执行附加或覆盖操作。
    5. 点击高级选项,然后执行以下操作:
      • 写入偏好设置部分,选择附加到表覆盖表
      • 如果要忽略表架构中不存在的行中的值,请选择未知值
      • 加密部分,点击客户管理的密钥,以使用 Cloud Key Management Service 密钥。如果保留 Google 管理的密钥设置,BigQuery 将对静态数据进行加密
    6. 点击创建表

SQL

使用 LOAD DATA DDL 语句. 以下示例将 Parquet 文件附加到表 mytable 中:

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,输入以下语句:

    LOAD DATA INTO mydataset.mytable
    FROM FILES (
      format = 'PARQUET',
      uris = ['gs://bucket/path/file.parquet']);

  3. 点击 运行

如需详细了解如何运行查询,请参阅运行交互式查询

bq

输入带 --replace 标志的 bq load 命令可以覆盖表。使用 --noreplace 标志可将数据附加到表。如果未指定任何标志,则默认附加数据。您可以添加 --source_format 标志并将其设置为 PARQUET。由于系统会自动从自描述源数据中检索 Parquet 架构,因此您无需提供架构定义。

(可选)提供 --location 标志并将其值设置为您的位置

其他可选标志包括:

  • --destination_kms_key:用于加密表数据的 Cloud KMS 密钥。
bq --location=LOCATION load \
--[no]replace \
--source_format=FORMAT \
DATASET.TABLE \
PATH_TO_SOURCE

替换以下内容:

  • location:您所在的位置--location 是可选标志。您可以使用 .bigqueryrc 文件设置位置的默认值。
  • formatPARQUET
  • dataset:现有数据集。
  • table:要向其中加载数据的表的名称。
  • path_to_source 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。系统也支持使用通配符

示例:

以下命令可从 gs://mybucket/mydata.parquet 加载数据并覆盖 mydataset 中名为 mytable 的表。

    bq load \
    --replace \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

以下命令可从 gs://mybucket/mydata.parquet 加载数据,并将数据附加到 mydataset 中名为 mytable 的表。

    bq load \
    --noreplace \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

如需了解如何使用 bq 命令行工具附加和覆盖分区表,请参阅对分区表数据执行附加和覆盖操作

API

  1. 创建指向 Cloud Storage 中源数据的 load 作业。

  2. (可选)在作业资源 jobReference 部分的 location 属性中指定您的位置

  3. source URIs 属性必须是完全限定的,格式为 gs://BUCKET/OBJECT。您可以采用英文逗号分隔列表的形式添加多个 URI。请注意,系统也支持通配符

  4. configuration.load.sourceFormat 属性设置为 PARQUET,以指定数据格式。

  5. configuration.load.writeDisposition 属性设置为 WRITE_TRUNCATEWRITE_APPEND,以指定写入偏好设置。

Go

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 BigQuery Go API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importParquetTruncate demonstrates loading Apache Parquet data from Cloud Storage into a table
// and overwriting/truncating existing data in the table.
func importParquetTruncate(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.parquet")
	gcsRef.SourceFormat = bigquery.Parquet
	gcsRef.AutoDetect = true
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
	loader.WriteDisposition = bigquery.WriteTruncate

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证


import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import java.math.BigInteger;

public class LoadParquetReplaceTable {

  public static void runLoadParquetReplaceTable() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    loadParquetReplaceTable(datasetName);
  }

  public static void loadParquetReplaceTable(String datasetName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Imports a GCS file into a table and overwrites table data if table already exists.
      // This sample loads CSV file at:
      // https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
      String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet";
      TableId tableId = TableId.of(datasetName, "us_states");

      // For more information on LoadJobConfiguration see:
      // https://googleapis.dev/java/google-cloud-clients/latest/com/google/cloud/bigquery/LoadJobConfiguration.Builder.html
      LoadJobConfiguration configuration =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.parquet())
              // Set the write disposition to overwrite existing table data.
              .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
              .build();

      // For more information on Job see:
      // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
      // Load the table
      Job job = bigquery.create(JobInfo.of(configuration));

      // Load data from a GCS parquet file into the table
      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load into the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      // Check number of rows loaded into the table
      BigInteger numRows = bigquery.getTable(tableId).getNumRows();
      System.out.printf("Loaded %d rows. \n", numRows);

      System.out.println("GCS parquet overwrote existing table successfully.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Table extraction job was interrupted. \n" + e.toString());
    }
  }
}

Node.js

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 BigQuery Node.js API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.parquet';

async function loadParquetFromGCSTruncate() {
  /**
   * Imports a GCS file into a table and overwrites
   * table data if table already exists.
   */

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = "my_dataset";
  // const tableId = "my_table";

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'PARQUET',
    // Set the write disposition to overwrite existing table data.
    writeDisposition: 'WRITE_TRUNCATE',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);
  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

PHP

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 PHP 设置说明进行操作。 如需了解详情,请参阅 BigQuery PHP API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableID = 'The BigQuery table ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$table = $bigQuery->dataset($datasetId)->table($tableId);

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet';
$loadConfig = $table->loadFromStorage($gcsUri)->sourceFormat('PARQUET')->writeDisposition('WRITE_TRUNCATE');
$job = $table->runJob($loadConfig);

// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});

// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

如需将行附加到现有表,请将 LoadJobConfig.write_disposition 属性设置为 WRITE_APPEND

如需替换现有表中的行,请将 LoadJobConfig.write_disposition 属性设置为 WRITE_TRUNCATE

import io

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
)

body = io.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format=bigquery.SourceFormat.PARQUET,
)

uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"
load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

加载 Hive 分区的 Parquet 数据

BigQuery 支持加载存储在 Cloud Storage 中的 Hive 分区 Parquet 数据,并将 Hive 分区列作为目标 BigQuery 代管表中的列进行填充。如需了解详情,请参阅加载外部分区数据

Parquet 转换

本部分介绍 BigQuery 如何在加载 Parquet 数据时解析各种数据类型。

某些 Parquet 数据类型(例如 INT32INT64BYTE_ARRAYFIXED_LEN_BYTE_ARRAY)可以转换为多个 BigQuery 数据类型。要确保 BigQuery 正确转换 Parquet 数据类型,请在 Parquet 文件中指定适当的数据类型。

比方说,如需将 Parquet INT32 数据类型转换为 BigQuery DATE 数据类型,请指定以下内容:

optional int32 date_col (DATE);

BigQuery 会将 Parquet 数据类型转换为以下部分中介绍的 BigQuery 数据类型。

类型转换

Parquet 类型 Parquet 逻辑类型 BigQuery 数据类型
BOOLEAN BOOLEAN
INT32 无、INTEGERUINT_8UINT_16UINT_32INT_8INT_16INT_32 INT64
INT32 DECIMAL NUMERIC、BIGNUMER 或 STRING
INT32 DATE DATE
INT64 无、INTEGERUINT_64INT_64 INT64
INT64 DECIMAL NUMERIC、BIGNUMER 或 STRING
INT64 TIMESTAMPprecision=MILLIS (TIMESTAMP_MILLIS) TIMESTAMP
INT64 TIMESTAMPprecision=MICROS (TIMESTAMP_MICROS) TIMESTAMP
INT96 TIMESTAMP
FLOAT FLOAT64
DOUBLE FLOAT64
BYTE_ARRAY BYTES
BYTE_ARRAY STRING (UTF8) STRING
FIXED_LEN_BYTE_ARRAY DECIMAL NUMERIC、BIGNUMER 或 STRING
FIXED_LEN_BYTE_ARRAY BYTES

嵌套群组会转换为 STRUCT 类型。系统不支持 Parquet 类型和转换类型的其他组合。

无符号逻辑类型

Parquet UINT_8UINT_16UINT_32UINT_64 类型是无符号的。加载到 BigQuery 签名的 INTEGER 列中时,BigQuery 会将这些类型的值视为无符号。对于 UINT_64,如果无符号值超过 INTEGER 最大值 9,223,372,036,854,775,807,则返回错误。

decimal 逻辑类型

Decimal 逻辑类型可以转换为 NUMERICBIGNUMERICSTRING 类型。转换后的类型取决于 decimal 逻辑类型的精度和比例参数以及指定的 decimal 目标类型。按如下方式指定小数目标类型:

Enum 逻辑类型

Enum 逻辑类型可以转换为 STRINGBYTES。按如下方式指定转换目标类型:

List 逻辑类型

您可以为 Parquet LIST 逻辑类型启用架构推断。BigQuery 会检查 LIST 节点是否采用标准形式,或采用向后兼容性规则所述的形式之一:

// standard form
<optional | required> group <name> (LIST) {
  repeated group list {
    <optional | required> <element-type> element;
  }
}

如果是,则转换后架构中 LIST 节点的相应字段将被视为节点具有以下架构:

repeated <element-type> <name>

节点“list”和“element”被省略。

地理空间数据

您可以通过指定类型为 GEOGRAPHY 的 BigQuery 架构来加载 STRING 列中包含 WKT、十六进制编码的 WKBGeoJSON,或者 BYTE_ARRAY 列中包含 WKB 的 Parquet 文件。如需了解详情,请参阅加载地理空间数据

您还可以加载 GeoParquet 文件。在这种情况下,GeoParquet 元数据描述的列默认会被解读为类型 GEOGRAPHY。您还可以通过提供显式架构,将原始 WKB 数据加载到 BYTES 列中。如需了解详情,请参阅加载 GeoParquet 文件

列名称转换

列名称可以包含字母(a-z、A-Z)、数字 (0-9) 或下划线 (_),并且必须以字母或下划线开头。如果使用灵活的列名称,则 BigQuery 支持列名称以数字开头。列以数字开头时要格外注意,因为将灵活的列名称与 BigQuery Storage Read API 或 BigQuery Storage Write API 搭配使用需要特殊处理。如需详细了解灵活的列名称支持,请参阅灵活的列名称

列名称的长度不得超过 300 个字符。列名称不能使用以下任何前缀:

  • _TABLE_
  • _FILE_
  • _PARTITION
  • _ROW_TIMESTAMP
  • __ROOT__
  • _COLIDENTIFIER

列名不可重复,即使其大小写不同也不行。例如,名为 Column1 的列和名为 column1 的列被视作相同。如需详细了解列命名规则,请参阅 GoogleSQL 参考文档中的列名称

如果表名称(例如 test)与其一个列名称(例如 test)相同,则 SELECT 表达式会将 test 列解释为包含所有其他表列的 STRUCT。如需避免这种冲突,请使用以下方法之一:

  • 避免为表及其列使用相同的名称。

  • 为表分配其他别名。例如,以下查询会为表 project1.dataset.test 分配表别名 t

    SELECT test FROM project1.dataset.test AS t;
    
  • 引用列时添加表名称。例如:

    SELECT test.test FROM project1.dataset.test;
    

灵活的列名称

您可以更灵活地命名列,包括扩展使用非英语字符以及其他符号。

灵活的列名称支持以下字符:

  • 任何语言的任何字母,由 Unicode 正则表达式 \p{L} 表示。
  • 任何语言的任何数字字符,由 Unicode 正则表达式 \p{N} 表示。
  • 任何由 Unicode 正则表达式 \p{Pc} 表示的连接器标点符号(包括下划线)。
  • 连字符或短划线(由 Unicode 正则表达式 \p{Pd} 表示)。
  • 任何伴随另一个字符的标记,由 Unicode 正则表达式 \p{M} 表示。例如重音符号、变音符号或括起的框。
  • 以下特殊字符:
    • 与符号 (&),由 Unicode 正则表达式 \u0026 表示。
    • 百分号 (%),由 Unicode 正则表达式 \u0025 表示。
    • 等号 (=),由 Unicode 正则表达式 \u003D 表示。
    • 加号 (+),由 Unicode 正则表达式 \u002B 表示。
    • 冒号 (:),由 Unicode 正则表达式 \u003A 表示。
    • 撇号 ('),由 Unicode 正则表达式 \u0027 表示。
    • 小于符号 (<),以 Unicode 正则表达式 \u003C 表示。
    • 大于符号 (>),由 Unicode 正则表达式 \u003E 表示。
    • 井号 (#),由 Unicode 正则表达式 \u0023 表示。
    • 竖线 (|),由 Unicode 正则表达式 \u007c 表示。
    • 空格。

灵活的列名称不支持以下特殊字符:

  • 感叹号 (!),由 Unicode 正则表达式 \u0021 表示。
  • 引号 ("),由 Unicode 正则表达式 \u0022 表示。
  • 美元符号 ($),由 Unicode 正则表达式 \u0024 表示。
  • 左括号 ((),由 Unicode 正则表达式 \u0028 表示。
  • 右括号 ()),由 Unicode 正则表达式 \u0029 表示。
  • 星号 (*),由 Unicode 正则表达式 \u002A 表示。
  • 逗号 (,),由 Unicode 正则表达式 \u002C 表示。
  • 由 Unicode 正则表达式 \u002E 表示的英文句号 (.)。
  • 正斜杠 (/),由 Unicode 正则表达式 \u002F 表示。
  • 分号 (;),由 Unicode 正则表达式 \u003B 表示。
  • 问号 (?),由 Unicode 正则表达式 \u003F 表示。
  • 由 Unicode 正则表达式 \u0040 表示的 @ 符号 (@)。
  • 左方括号 ([),由 Unicode 正则表达式 \u005B 表示。
  • 反斜杠 (\),由 Unicode 正则表达式 \u005C 表示。
  • 右方括号 (]),由 Unicode 正则表达式 \u005D 表示。
  • 扬抑符 (^),由 Unicode 正则表达式 \u005E 表示。
  • 重音符号 (`),由 Unicode 正则表达式 \u0060 表示。
  • 左花括号 ({),由 Unicode 正则表达式 \u007B 表示。
  • 右花括号 (}),由 Unicode 正则表达式 \u007D 表示。
  • 波浪号 (~),由 Unicode 正则表达式 \u007E 表示。

如需了解其他准则,请参阅列名称

BigQuery Storage Read API 和 BigQuery Storage Write API 都支持扩展的列字符。如需将扩展的 Unicode 字符列表与 BigQuery Storage Read API 搭配使用,您必须设置标志。您可以使用 displayName 属性检索列名称。以下示例展示了如何使用 Python 客户端来设置标志:

from google.cloud.bigquery_storage import types
requested_session = types.ReadSession()

#set avro serialization options for flexible column.
options = types.AvroSerializationOptions()
options.enable_display_name_attribute = True
requested_session.read_options.avro_serialization_options = options

如需将扩展的 Unicode 字符列表与 BigQuery Storage Write API 搭配使用,您必须提供采用 column_name 表示法的架构,除非您使用的是 JsonStreamWriter 写入器对象。以下示例展示了如何提供架构:

syntax = "proto2";
package mypackage;
// Source protos located in github.com/googleapis/googleapis
import "google/cloud/bigquery/storage/v1/annotations.proto";

message FlexibleSchema {
  optional string item_name_column = 1
  [(.google.cloud.bigquery.storage.v1.column_name) = "name-列"];
  optional string item_description_column = 2
  [(.google.cloud.bigquery.storage.v1.column_name) = "description-列"];
}

在此示例中,item_name_columnitem_description_column 是需要遵循协议缓冲区命名惯例的占位名称。请注意,column_name 注解始终优先于占位名称。

限制

外部表不支持灵活的列名称。

如果 Parquet 文件中的某些列名称中包含英文句点 (.),则您不能加载此文件。

如果 Parquet 列名称包含其他字符(英文句点除外),系统会用下划线替换这些字符。您可以在列名称末尾处添加下划线,以避免冲突。例如,如果一个 Parquet 文件包含 Column1column1 这两列,那么系统会将这些列分别加载为 Column1column1_

调试 Parquet 文件

如果加载作业因数据错误而失败,您可以使用 PyArrow 验证 Parquet 数据文件是否已损坏。如果 PyArrow 无法读取文件,BigQuery 加载作业可能会拒绝这些文件。以下示例展示了如何使用 PyArrow 读取 Parquet 文件的内容:

from pyarrow import parquet as pq

# Read the entire file
pq.read_table('your_sample_file.parquet')
# Read specific columns
pq.read_table('your_sample_file.parquet',columns=['some_column', 'another_column'])
# Read the metadata of specific columns
file_metadata=pq.read_metadata('your_sample_file.parquet')
for col in file_metadata.row_group(0).to_dict()['columns']:
    print col['column_path_in_schema']
    print col['num_values']

如需了解详情,请参阅 PyArrow 文档