使用舊版串流 API

本文件說明如何使用舊版 tabledata.insertAll 方法,將資料以串流方式傳送至 BigQuery。

對於新專案,建議您使用 BigQuery Storage Write API,而非 tabledata.insertAll 方法。Storage Write API 的定價較低且功能齊全 (包括單次傳送語意),如果您要將現有專案從 tabledata.insertAll 方法遷移至 Storage Write API,建議您選取預設串流。系統仍完整支援 tabledata.insertAll 方法。

事前準備

  1. 確認您具備目的地資料表所屬資料集的寫入權限。除非您使用範本資料表,否則該資料表在開始寫入資料之前就必須存在。如要進一步瞭解範本資料表,請參閱使用範本資料表自動建立資料表

  2. 查看串流資料的配額政策

  3. Make sure that billing is enabled for your Google Cloud project.

  4. 免費方案並不支援資料串流功能。如果不啟用計費功能,當您嘗試進行資料串流作業時,系統會顯示下列錯誤訊息:BigQuery: Streaming insert is not allowed in the free tier.

  5. 授予身分與存取權管理 (IAM) 角色,讓使用者取得執行本文件中各項工作的必要權限。

所需權限

如要將資料以串流方式傳入 BigQuery,您需要具備下列 IAM 權限:

  • bigquery.tables.updateData (可讓您在資料表中插入資料)
  • bigquery.tables.get (可讓您取得資料表中繼資料)
  • bigquery.datasets.get (可讓您取得資料集中繼資料)
  • bigquery.tables.create (如果您使用範本資料表自動建立資料表,則為必要屬性)

以下每個預先定義的 IAM 角色都包含將資料串流至 BigQuery 所需的權限:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

如要進一步瞭解 BigQuery 中的 IAM 角色和權限,請參閱「預先定義的角色與權限」一文。

將資料串流至 BigQuery

C#

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 C# 設定說明進行操作。詳情請參閱 BigQuery C# API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 BigQuery Go API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 Java 設定說明進行操作。詳情請參閱 BigQuery Java API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 BigQuery Node.js API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 PHP 設定說明進行操作。詳情請參閱 BigQuery PHP API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 Python 設定說明進行操作。詳情請參閱 BigQuery Python API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 Ruby 設定說明進行操作。詳情請參閱 BigQuery Ruby API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

插入資料列時,您不需要在 insertID 欄位中填入資料。以下範例說明如何避免在串流時,為每個資料列傳送 insertID

Java

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 Java 設定說明進行操作。詳情請參閱 BigQuery Java API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 Python 設定說明進行操作。詳情請參閱 BigQuery Python API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

傳送日期和時間資料

針對日期和時間欄位,請在 tabledata.insertAll 方法中將資料格式化如下:

類型 格式
DATE 採用 "YYYY-MM-DD" 格式的字串
DATETIME 採用 "YYYY-MM-DD [HH:MM:SS]" 格式的字串
TIME 採用 "HH:MM:SS" 格式的字串
TIMESTAMP 自 1970-01-01 (Unix 紀元) 起的秒數,或 "YYYY-MM-DD HH:MM[:SS]" 格式的字串

傳送範圍資料

如果欄位類型為 RANGE<T>,請將 tabledata.insertAll 方法中的資料格式化為 JSON 物件,其中包含兩個欄位:startendstartend 欄位缺少或為空值,代表無邊界。這些欄位必須使用相同的支援 JSON 格式,格式類型為 T,其中 T 可以是 DATEDATETIMETIMESTAMP 其中之一。

在以下範例中,f_range_date 欄位代表資料表中的 RANGE<DATE> 欄。使用 tabledata.insertAll API 將資料列插入這個資料欄。

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

串流資料可用性

BigQuery 成功確認 tabledata.insertAll 要求後,即可立即使用 GoogleSQL 查詢即時分析資料。

近期傳送至擷取時間分區資料表的資料列,暫時會在 _PARTITIONTIME 虛擬資料欄中顯示為 NULL 值。針對這類資料列,BigQuery 會在背景中指派 PARTITIONTIME 欄的最終非空值,通常在幾分鐘內完成。在極少數情況下,這項作業最多可能需要 90 分鐘。

部分最近串流的資料列可能無法在幾分鐘內進行表格複製作業。在極少數情況下,這項作業最多可能需要 90 分鐘。如要查看資料是否可用於表格複製,請檢查 tables.get 回應中名為 streamingBuffer 的部分。如果沒有 streamingBuffer 部分,表示資料可供複製。您也可以使用 streamingBuffer.oldestEntryTime 欄位,識別串流緩衝區中的記錄存續時間。

盡可能清除重複的功能

當您為插入的資料列提供 insertId 時,BigQuery 會使用這個 ID 盡可能在最多一分鐘內刪除重複的資料。也就是說,如果您在該時間範圍內,將具有相同 insertId 的資料列串流至同一個資料表超過一次,BigQuery 可能會刪除該資料列的多個重複項目,只保留其中一個。

系統會預期提供相同 insertId 的資料列也相同。如果兩個資料列的 insertId 相同,BigQuery 會保留哪一列資料列並非確定性。

一般來說,去重作業是用於分散式系統中的重試情境,在某些錯誤情況下 (例如系統與 BigQuery 之間的網路發生錯誤,或是 BigQuery 發生內部錯誤),無法確定串流資料插入作業的狀態。如果您重試插入作業,請針對同一組資料列使用相同的 insertId,以便 BigQuery 嘗試刪除資料重複部分。詳情請參閱排解串流資料插入的相關問題

BigQuery 提供的去重機制是盡力而為,不應視為保證資料中沒有重複項目的機制。此外,BigQuery 可能會隨時降低最佳努力去重複資料的品質,以確保資料的可靠性和可用性。

如果您對資料有嚴格的去重複要求,可改為使用支援交易Google Cloud Datastore 服務。

停用盡可能清除重複的功能

您可以透過不在每個插入的資料列中填入 insertId 欄位來停用盡可能清除重複的功能。這是建議的資料插入方式。

Apache Beam 和 Dataflow

如要在使用 Apache Beam 的 BigQuery I/O 連接器 (適用於 Java) 時,停用盡力去重功能,請使用 ignoreInsertIds() 方法

手動移除重複內容

如要確保串流完成後不會出現重複的資料列,請使用下列手動程序:

  1. 在資料表結構定義中加入 insertId 做為資料欄,並在每個資料列的資料中加入 insertId 值。
  2. 在串流作業停止後,執行下列查詢以檢查是否有重複內容:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    假如結果大於 1,表示有重複的項目。
  3. 如要移除重複項目,請執行下列查詢。指定目標資料表、允許大型結果,並停用結果扁平化。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

以下是重複項目移除查詢的相關注意事項:

  • 保守的重複項目移除查詢策略是指定一份新資料表,或者,您也可以指定包含 WRITE_TRUNCATE 寫入配置的來源資料表。
  • 重複項目移除查詢會將 row_number 資料欄 (包含 1 這個值) 加到資料表結構定義尾端。這項查詢使用 GoogleSQLSELECT * EXCEPT 陳述式,從目的地資料表中排除 row_number 資料欄。#standardSQL 前置字串會為此查詢啟用 GoogleSQL。此外,您也可以選取特定的資料欄名稱來略過此資料欄。
  • 如要在移除重複項目後查詢即時資料,您也可以使用重複項目移除查詢來對資料表建立資料檢視。請注意,資料檢視的查詢成本是根據在資料檢視中選取的資料欄計算,這可能會導致位元組掃描大小過大。

以串流方式傳入時間分區資料表

將資料以串流方式傳送至時間分區資料表時,每個分區都會有一個串流緩衝區。如將 writeDisposition 屬性設為 WRITE_TRUNCATE,當您在執行會覆寫分區的載入、查詢或複製工作時,系統會保留串流緩衝區。如果您要移除串流緩衝區,請對分區呼叫 tables.get,以確認串流緩衝區是空的。

擷取時間分區

當您串流至擷取時間分區資料表時,BigQuery 會根據目前的世界標準時間推斷目的地分區。

新抵達的資料會暫時放置在串流緩衝區中的 __UNPARTITIONED__ 分區。未分區資料累積到足夠的大小後,BigQuery 就會將資料分區到正確的分區。不過,資料從 __UNPARTITIONED__ 分區移出所需的時間並未有服務水準協議。因此,查詢可以使用虛擬資料欄 (_PARTITIONTIME_PARTITIONDATE,視您偏好的資料類型而定) 從 __UNPARTITIONED__ 分區篩選出 NULL 值,藉此從查詢中排除串流緩衝區的資料。

如果您要將資料以串流方式傳送至每日分區資料表,則可在 insertAll 要求中提供分區修飾符,藉此覆寫日期推論。在 tableId 參數中加入修飾符。例如,您可以使用分區修飾符,將資料串流至資料表 table1 的 2021-03-01 對應分區:

table1$20210301

使用分區修飾符以串流方式傳輸資料時,可以根據目前的世界標準時間,以串流方式將資料傳輸至過去 31 天和未來 16 天之間 (相對於目前日期) 的分區。如要針對前述允許範圍外的日期寫入分區,請改用載入或查詢工作,如附加並覆寫分區資料表資料一文所述。

使用分區修飾符串流傳輸資料時,系統僅支援每日分區資料表。不支援每小時、每月或每年分區的資料表。

如要進行測試,您可以使用 bq 指令列工具 bq insert CLI 指令。例如,以下指令會將單一列以串流方式傳入 mydataset.mytable 分區資料表中日期為 2017 年 1 月 1 日 ($20170101) 的分區:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

時間單位資料欄分區

您可以將資料串流至以 DATEDATETIMETIMESTAMP 欄分區的資料表,該欄的資料範圍為過去 10 年至未來 1 年。超出這個範圍的資料會遭到拒絕。

資料進行串流時,一開始會置於 __UNPARTITIONED__ 分區。當未分區資料累積到足夠的量時,BigQuery 就會自動重新分區資料,並將資料放入適當的分區。不過,資料從 __UNPARTITIONED__ 分區移出所需的時間並未有服務水準協議。

  • 注意:系統處理每日區隔的方式與處理每小時、每月和每年區隔的方式不同。只有日期範圍 (過去 7 天到未來 3 天) 以外的資料會擷取至 UNPARTITIONED 分區,等待重新分區。另一方面,如果是每小時分區資料表,資料一律會擷取至 UNPARTITIONED 分區,之後再重新分區。

使用範本資料表自動建立資料表

範本資料表提供一種機制,可將邏輯資料表分成許多較小的資料表,以便建立較小的資料集 (例如,透過使用者 ID)。範本表格有許多限制,詳情請參閱下文。建議您使用分區資料表叢集資料表來實現這項行為。

如要透過 BigQuery API 使用範本資料表,請將 templateSuffix 參數新增到 insertAll 要求。針對 bq 指令列工具,請在 insert 指令中加入 template_suffix 旗標。假如 BigQuery 偵測到 templateSuffix 參數或 template_suffix 標記,就會將指定資料表視為基礎範本。系統會建立一份結構定義與指定資料表相同的新資料表,而且該資料表具有包含指定後置字元的名稱:

<targeted_table_name> + <templateSuffix>

只要使用範本資料表,您就可以省去個別建立資料表以及為每個資料表指定結構定義的負擔。您只需要建立一個範本,並提供不同的後置字元,BigQuery 就能為您建立新資料表。BigQuery 會將資料表放在相同的專案和資料集中。

使用範本建立的資料表通常會在幾秒內完成。在極少數情況下,可能需要更長的時間才能使用。

變更範本資料表結構定義

如果您變更範本資料表結構定義,後續產生的所有資料表都會使用更新後的結構定義。先前產生的資料表不會受到影響,除非現有資料表仍有串流緩衝區。

就仍有串流緩衝區的現有資料表而言,假如您以回溯相容的方式修改範本資料表結構定義,主動串流產生的資料表結構定義也會隨之更新。不過,如果您以非回溯相容的方式修改範本資料表結構定義,使用舊結構定義的所有緩衝資料都將會遺失。此外,如果已產生的現有資料表使用現已不相容的舊結構定義,您就無法將新資料串流傳入表內。

變更範本資料表結構定義後,請等待變更完成傳播,再嘗試插入新資料或查詢產生的資料表。插入新欄位的要求應該會在幾分鐘內成功,而查詢新欄位的嘗試可能需要最多 90 分鐘。

假如您想變更已產生資料表的結構定義,請等到透過範本資料表進行的串流作業停止,且已產生資料表的串流統計資料區塊已不存在 tables.get() 回應中 (表示資料表已沒有緩衝資料),否則請勿變更結構定義。

分區資料表叢集資料表不受上述限制,是建議使用的機制。

範本資料表詳細資料

範本後置值
templateSuffix (或 --template_suffix) 值只能包含英文字母 (a-z、A-Z)、數字 (0-9) 或底線 (_)。表格名稱和表格副檔名的總長度上限為 1024 個半形字元。
配額

範本表格須遵守串流配額限制。您的專案最多可使用範本表格每秒產生 10 個表格,類似 tables.insert API。這項配額僅適用於建立的資料表,不適用於修改的資料表。

如果應用程式需要每秒建立超過 10 個資料表,建議您使用叢集資料表。舉例來說,您可以將高基數資料表 ID 放入單一叢集資料表的鍵欄。

存留時間 (TTL)

已產生的資料表會沿用資料集的到期時間。已產生的資料表跟一般的串流資料一樣,無法立即複製。

刪除重複資料

系統只會針對目的地資料表的相同參照內容刪除重複資料。舉例來說,如果您同時使用範本資料表和一般 insertAll 指令,將資料串流至產生的資料表,則範本資料表和一般 insertAll 指令插入的資料列之間不會發生重複資料移除作業。

瀏覽次數

範本資料表和產生的資料表不得設為檢視畫面。

排解串流資料插入問題

接下來的幾個小節將討論,如何排解您使用舊版串流 API 將資料串流至 BigQuery時所發生的錯誤。如要進一步瞭解如何解決串流資料插入配額錯誤,請參閱「串流資料插入配額錯誤」。

失敗的 HTTP 回應代碼

如果您收到失敗的 HTTP 回應碼 (例如網路錯誤),就無法判斷串流插入作業是否成功。如果您嘗試重新傳送要求,可能會導致資料表中出現重複的資料列。為防止資料表出現重複的內容,請在傳送要求時設定 insertId 屬性。BigQuery 會利用 insertId 屬性來清除重複的內容。

如果您收到權限錯誤、資料表名稱無效的錯誤,或是超過配額的錯誤,代表系統沒有插入任何資料列,且整個要求都執行失敗。

成功的 HTTP 回應代碼

即使您收到 成功的 HTTP 回應代碼,還是需要查看回應的 insertErrors 屬性,以便判斷資料欄的插入作業是否已執行成功,因為 BigQuery 有可能只成功插入了部分資料列。您可能會遇到下列任一情況:

  • 已成功插入所有資料列。如果 insertErrors 屬性是空白清單,代表所有資料列都已成功插入。
  • 部分資料列已成功插入。除非發生任何資料列中結構定義不相符的情況,否則在 insertErrors 屬性中列出的資料列都不會插入資料表,而所有其他的資料列都已成功插入。errors 屬性會針對每個插入失敗的資料列,提供失敗的詳細原因。index 屬性會針對發生該錯誤的要求,提供從 0 開始的資料列索引。
  • 未成功插入任何資料列。如果 BigQuery 在處理要求中個別的資料列時碰到結構定義不相符的情況,就不會插入任何資料列,並針對每個資料列傳回 insertErrors 項目,即使結構定義沒有不相符的資料列也算在內。沒有結構定義不相符的資料列會有 reason 屬性設定為 stopped 的錯誤,且可讓您照原樣重新傳送。而失敗的資料列會包含關於結構定義不相符的詳細資訊。如要瞭解每個 BigQuery 資料類型支援的通訊協定緩衝區類型,請參閱「資料類型轉換」。

串流資料插入的中繼資料錯誤

由於 BigQuery 的串流 API 是針對高插入率所設計,因此對基礎資料表中繼資料所做的修改最終會在與串流系統互動時保持一致。大多數情況下,中繼資料變更會在幾分鐘內生效,但在這段期間,API 回應可能會反映出不一致的資料表狀態。

部分情況包括:

  • 結構定義變更。當您為最近受到串流資料插入的資料表修改結構定義時,可能會收到指出結構定義不相符錯誤的回應,這是因為串流系統可能無法立刻反映出結構定義的變更。
  • 資料表建立/刪除:串流至不存在的資料表會傳回 notFound 回應的變化形式。而後續的串流資料插入作業,可能無法立刻辨識出在回應中建立的資料表。同樣地,刪除或重新建立資料表時,串流資料插入可能會在一段時間內傳送至舊資料表。新資料表中可能沒有串流插入項目。
  • 表格截斷。同樣地,截斷資料表的資料 (例如查詢工作會使用 WRITE_TRUNCATE 的 writeDisposition 時),可能會導致後續插入的一致性出現落差。

遺失/無法取得資料

串流插入內容會暫時存放在以寫入為優先的儲存空間中,這種儲存空間的可用性特徵與代管儲存空間不同。BigQuery 中的某些作業不會與寫入最佳化儲存空間互動,例如資料表複製作業和 tabledata.list 等 API 方法。近期的串流資料不會出現在目的地資料表或輸出內容中。