기존 스트리밍 API 사용

이 문서에서는 기존 tabledata.insertAll 메서드를 사용하여 데이터를 BigQuery로 스트리밍하는 방법을 설명합니다.

새 프로젝트의 경우 tabledata.insertAll 메서드 대신 BigQuery Storage Write API를 사용하는 것이 좋습니다. Storage Write API는 정확히 1회 전송 시맨틱스를 비롯해 더 저렴한 가격과 더 강력한 기능을 제공합니다. 기존 프로젝트를 tabledata.insertAll 메서드에서 Storage Write API로 마이그레이션하는 경우 기본 스트림을 선택하는 것이 좋습니다. tabledata.insertAll 메서드는 계속해서 완벽히 지원됩니다.

시작하기 전에

  1. 대상 테이블이 포함된 데이터 세트에 대한 쓰기 액세스 권한이 있는지 확인합니다. 템플릿 테이블을 사용하는 경우를 제외하고 데이터를 쓰기 전에 테이블이 존재해야 합니다. 템플릿 테이블에 대한 자세한 내용은 템플릿 테이블을 사용하여 자동으로 테이블 만들기를 참조하세요.

  2. 스트리밍 데이터의 할당량 정책을 확인합니다.

  3. Make sure that billing is enabled for your Google Cloud project.

  4. 무료 등급으로는 스트리밍을 사용할 수 없습니다. 결제를 사용 설정하지 않고 스트리밍을 사용하면 BigQuery: Streaming insert is not allowed in the free tier. 오류가 발생합니다.

  5. 사용자에게 이 문서의 각 작업을 수행하는 데 필요한 권한을 부여하는 Identity and Access Management(IAM) 역할을 부여합니다.

필수 권한

BigQuery로 데이터를 스트리밍하려면 다음 IAM 권한이 필요합니다.

  • bigquery.tables.updateData(테이블에 데이터 삽입 가능)
  • bigquery.tables.get(테이블 메타데이터를 가져올 수 있음)
  • bigquery.datasets.get(데이터 세트 메타데이터를 가져올 수 있음)
  • bigquery.tables.create(템플릿 테이블을 사용하여 테이블을 자동으로 만드는 경우 필요)

사전 정의된 다음 각 IAM 역할에는 BigQuery로 데이터를 스트리밍하는 데 필요한 권한이 포함되어 있습니다.

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

BigQuery의 IAM 역할과 권한에 대한 자세한 내용은 사전 정의된 역할 및 권한을 참조하세요.

BigQuery에 데이터 스트리밍

C#

이 샘플을 사용해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용C# 설정 안내를 따르세요. 자세한 내용은 BigQuery C# API 참고 문서를 확인하세요.

BigQuery에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

이 샘플을 사용해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용Go 설정 안내를 따르세요. 자세한 내용은 BigQuery Go API 참고 문서를 확인하세요.

BigQuery에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

이 샘플을 사용해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용Java 설정 안내를 따르세요. 자세한 내용은 BigQuery Java API 참고 문서를 확인하세요.

BigQuery에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

이 샘플을 사용해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용Node.js 설정 안내를 따르세요. 자세한 내용은 BigQuery Node.js API 참고 문서를 확인하세요.

BigQuery에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

이 샘플을 사용해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용PHP 설정 안내를 따르세요. 자세한 내용은 BigQuery PHP API 참고 문서를 확인하세요.

BigQuery에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

이 샘플을 사용해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용Python 설정 안내를 따르세요. 자세한 내용은 BigQuery Python API 참고 문서를 확인하세요.

BigQuery에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

이 샘플을 사용해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용Ruby 설정 안내를 따르세요. 자세한 내용은 BigQuery Ruby API 참고 문서를 확인하세요.

BigQuery에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

행을 삽입할 때 insertID 필드를 채울 필요가 없습니다. 다음 예시는 스트리밍 시 각 행에 insertID를 전송하지 않는 방법을 보여줍니다.

Java

이 샘플을 사용해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용Java 설정 안내를 따르세요. 자세한 내용은 BigQuery Java API 참고 문서를 확인하세요.

BigQuery에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

이 샘플을 사용해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용Python 설정 안내를 따르세요. 자세한 내용은 BigQuery Python API 참고 문서를 확인하세요.

BigQuery에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

날짜 및 시간 데이터 전송

날짜 및 시간 필드의 경우 다음과 같이 tabledata.insertAll 메서드로 데이터 형식을 지정합니다.

유형 형식
DATE "YYYY-MM-DD" 형식의 문자열
DATETIME "YYYY-MM-DD [HH:MM:SS]" 형식의 문자열
TIME "HH:MM:SS" 형식의 문자열
TIMESTAMP 1970-01-01(유닉스 시간) 이후의 초 수 또는 "YYYY-MM-DD HH:MM[:SS]" 형식의 문자열

범위 데이터 전송

RANGE<T> 유형 필드의 경우 startend 필드를 사용해서 tabledata.insertAll 메서드의 데이터 형식을 JSON 객체로 지정합니다. startend 필드의 누락된 값이나 NULL 값은 바인딩되지 않은 경계를 나타냅니다. 이러한 필드는 T 유형의 동일한 지원되는 JSON 형식을 포함해야 합니다. 여기서 TDATE, DATETIME, TIMESTAMP 중 하나일 수 있습니다.

다음 예시에서 f_range_date 필드는 테이블의 RANGE<DATE> 열을 나타냅니다. tabledata.insertAll API를 사용하여 이 열에 행이 삽입됩니다.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

스트림 데이터 가용성

BigQuery가 tabledata.insertAll 요청을 성공적으로 확인한 직후에 GoogleSQL 쿼리를 사용하여 실시간 분석에 데이터를 사용할 수 있습니다.

최근에 수집 시간으로 파티션을 나눈 테이블에 스트리밍된 행에는 _PARTITIONTIME 유사 열에 NULL 값이 있습니다. 이러한 행의 경우 BigQuery는 일반적으로 몇 분 내에 백그라운드에서 PARTITIONTIME 열의 최종 NULL이 아닌 값을 할당합니다. 드물지만 최대 90분이 소요될 수도 있습니다.

최근에 스트리밍된 일부 행을 일반적으로 몇 분 동안 테이블 복사에 사용하지 못할 수 있습니다. 드물지만 최대 90분이 소요될 수도 있습니다. 데이터를 테이블 복사에 사용할 수 있는지 여부를 보려면 streamingBuffer 섹션의 tables.get 응답을 확인합니다. streamingBuffer 섹션이 없으면 데이터를 복사에 사용할 수 있습니다. streamingBuffer.oldestEntryTime 필드를 사용하여 스트리밍 버퍼의 레코드 기간을 식별할 수도 있습니다.

최선형 중복 삭제

삽입된 행에 insertId을 입력하면 BigQuery는 이 ID를 사용하여 최대 1분 동안 최선형 중복 삭제를 지원합니다. 즉, 해당 기간 내에 같은 테이블 내에서 같은 insertId를 사용하여 동일한 행을 두 번 넘게 스트리밍하는 경우 BigQuery가 해당 행의 다중 일치 항목을 중복 삭제하고 해당 일치 항목 중 하나만 저장할 수 있습니다.

시스템에서는 동일한 insertId가 제공된 행도 동일할 것으로 예상합니다. 두 행의 insertId가 같다면 BigQuery가 보존하는 행은 확정되어 있지 않습니다.

중복 삭제는 일반적으로 시스템과 BigQuery 사이의 네트워크 오류 또는 BigQuery 내부 오류와 같은 특정 오류 상태에서는 스트리밍 삽입의 상태를 확인할 방법이 없는 배포된 시스템에서 시나리오를 재시도하는 것을 의미합니다. 삽입을 재시도하는 경우 BigQuery가 데이터 중복 삭제를 시도할 수 있도록 동일한 행 조합에 동일한 insertId를 사용하세요. 자세한 내용은 스트리밍 삽입 문제 해결을 참조하세요.

BigQuery에서 제공하는 중복 삭제는 최선의 방법입니다. 또한 데이터에 중복이 없도록 보장하는 메커니즘으로 의존해 사용해서는 안 됩니다. 또한 BigQuery는 데이터의 신뢰성과 가용성을 보장하기 위해 언제든지 최선의 중복 삭제의 품질을 저하시킬 수 있습니다.

데이터의 중복 삭제 요구사항이 엄격한 경우 트랜잭션을 지원하는 대체 서비스로 Google Cloud Datastore를 사용할 수 있습니다.

최선형 중복 삭제 사용 중지

삽입된 각 행에 대해 insertId 필드를 입력하지 않음으로써 최선형 중복 삭제를 사용 중지할 수 있습니다. 데이터 삽입에 권장되는 방법입니다.

Apache Beam 및 Dataflow

Apache Beam의 Java용 BigQuery I/O 커넥터를 사용할 때 최선의 중복 제거를 사용 중지하려면 ignoreInsertIds() 메서드를 사용합니다.

수동으로 중복 제거

스트리밍을 완료한 후 중복 행이 존재하지 않도록 하려면 다음 수동 프로세스를 사용하세요.

  1. insertId를 테이블 스키마에 열로 추가하고 insertId 값을 각 행의 데이터에 포함합니다.
  2. 스트리밍이 중지된 후 다음 쿼리를 수행하여 중복을 확인합니다.

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    결과가 1보다 크면 중복이 존재하는 것입니다.
  3. 중복을 제거하려면 다음 쿼리를 실행합니다. 대상 테이블을 지정하고 대량 결과를 허용하고 결과 평면화를 사용 중지합니다.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

중복 삭제 쿼리 참고사항:

  • 중복 삭제 쿼리를 위한 더 안전한 전략은 새 테이블을 타겟팅하는 것입니다. 또는 쓰기 처리 WRITE_TRUNCATE로 소스 테이블을 타겟팅할 수 있습니다.
  • 중복 삭제 쿼리는 값이 1row_number 열을 테이블 스키마 끝에 추가합니다. 쿼리는 GoogleSQLSELECT * EXCEPT 문을 사용하여 대상 테이블에서 row_number 열을 제외합니다. #standardSQL 프리픽스는 이 쿼리에 GoogleSQL을 사용 설정합니다. 또는 특정 열 이름별로 선택하여 이 열을 생략할 수 있습니다.
  • 중복이 제거된 라이브 데이터를 쿼리하는 경우 중복 제거 쿼리를 사용하여 테이블에서 뷰를 만들 수도 있습니다. 뷰에 대한 쿼리 비용은 뷰에 선택된 열을 기준으로 계산되며 이로 인해 검색된 바이트 크기가 커질 수 있음을 유의하세요.

시간으로 파티션을 나눈 테이블로 스트리밍

시간으로 파티션을 나눈 테이블로 데이터를 스트리밍하면 각 파티션에 스트리밍 버퍼가 발생합니다. writeDisposition 속성을 WRITE_TRUNCATE로 설정하여 파티션을 덮어쓰는 로드, 쿼리 또는 복사 작업을 수행하면 스트리밍 버퍼가 유지됩니다. 스트리밍 버퍼를 삭제하려면 파티션에서 tables.get을 호출하여 스트리밍 버퍼가 비어 있는지 확인합니다.

수집 시간으로 파티션 나누기

수집 시간으로 파티션을 나눈 테이블로 스트리밍할 때 BigQuery는 현재 UTC 시간에서 대상 파티션을 추론합니다.

새로 도착하는 데이터는 스트리밍 버퍼에 있는 동안 임시로 __UNPARTITIONED__ 파티션에 배치됩니다. 파티션으로 나누지 않은 데이터가 충분한 경우 BigQuery는 데이터를 파티션 나누기하여 올바른 파티션에 배치합니다. 하지만 __UNPARTITIONED__ 파티션에서 데이터를 이동하는 데 걸리는 시간에 대한 SLA는 없습니다. 쿼리는 유사 열 (선호하는 데이터 유형에 따라 _PARTITIONTIME 또는 _PARTITIONDATE) 중 하나를 사용해 NULL 값을 __UNPARTITIONED__ 파티션에서 필터링함으로써 쿼리에서 스트리밍 버퍼의 데이터를 제외할 수 있습니다.

일별로 파티션을 나눈 테이블로 데이터를 스트리밍하는 경우 파티션 데코레이터를 insertAll 요청에 포함시켜 추가하면 날짜 추론을 재정의할 수 있습니다. tableId 매개변수에 데코레이터를 포함합니다. 예를 들어 table1 테이블에서 파티션 데코레이터를 사용하면 2021-03-01에 해당하는 파티션에 스트리밍할 수 있습니다.

table1$20210301

파티션 데코레이터를 사용하여 스트리밍하는 경우 현재 UTC 시간을 기준으로 지난 31일 및 향후 16일 이내의 파티션으로 스트리밍할 수 있습니다. 이러한 허용 범위 이외의 날짜에 파티션에 기록하려면 파티션을 나눈 테이블 데이터에 추가 및 덮어쓰기에 설명된 대로 로드 또는 쿼리 작업을 사용합니다.

파티션 데코레이터를 사용한 스트리밍은 매일 파티션을 나눈 테이블에만 지원됩니다. 시간별, 월간 또는 연간 파티션을 나눈 테이블에서는 지원되지 않습니다.

테스트에는 bq 명령줄 도구 bq insert CLI 명령어를 사용하면 됩니다. 예를 들어 다음 명령어는 2017년 1월 1일($20170101) 날짜의 파티션에 대한 단일 행을 mydataset.mytable이라는 파티션을 나눈 테이블로 스트리밍합니다.

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

시간 단위 열로 파티션 나누기

지난 5년부터 향후 1년 이내의 DATE, DATETIME, TIMESTAMP 열을 기준으로 파티션을 나눈 테이블로 데이터를 스트리밍할 수 있습니다. 이 범위를 벗어나는 데이터는 거부됩니다.

데이터가 스트리밍되면 처음에는 __UNPARTITIONED__ 파티션에 배치됩니다. 파티션으로 나누지 않은 데이터가 충분한 경우 BigQuery는 자동으로 데이터를 다시 파티션 나누기하여 적절한 파티션에 배치합니다. 하지만 __UNPARTITIONED__ 파티션에서 데이터를 이동하는 데 걸리는 시간에 대한 SLA는 없습니다.

  • 참고: 일별 파티션은 시간별, 월간, 연간 파티션과 다르게 처리됩니다. 기간(지난 7일에서 향후 3일)을 벗어나는 데이터만 UNPARTITIONED 파티션으로 추출되고 다시 파티션을 나누기를 대기합니다. 반면 시간별로 파티션을 나눈 테이블의 경우 데이터가 항상 UNPARTITIONED 파티션으로 추출되고 나중에 다시 파티션을 나눕니다.

템플릿 테이블을 사용하여 자동으로 테이블 만들기

템플릿 테이블은 논리적 테이블을 더 작은 여러 테이블로 분할하여 더 작은 데이터 집합을 만드는 메커니즘을 제공합니다(예: 사용자 ID). 템플릿 테이블에는 아래 설명된 여러 제한 사항이 포함됩니다. 대신 파티션을 나눈 테이블클러스터링된 테이블이 동작을 수행하기 위해 권장되는 방법입니다.

BigQuery API를 통해 템플릿 테이블을 사용하려면 insertAll 요청에 templateSuffix 매개변수를 추가합니다. bq 명령줄 도구의 경우 insert 명령어에 template_suffix 플래그를 추가합니다. BigQuery는 templateSuffix 매개변수 또는 template_suffix 플래그를 감지하면 대상 테이블을 기본 템플릿으로 취급합니다. 대상 테이블과 동일한 스키마를 공유하며 지정된 서픽스를 포함하는 이름이 있는 새 테이블을 만듭니다.

<targeted_table_name> + <templateSuffix>

템플릿 테이블을 사용하면 각 테이블을 개별적으로 만들고 각 테이블의 스키마를 지정하는 오버헤드가 방지됩니다. 하나의 템플릿만 만들고 다양한 서픽스를 제공하여 BigQuery에서 새 테이블을 만들 수 있도록 하면 됩니다. BigQuery는 동일한 프로젝트 및 데이터세트에 테이블을 배치합니다.

템플릿 테이블을 사용하여 만들어진 테이블은 일반적으로 몇 초 내에 사용할 수 있습니다. 드물지만 사용할 수 있게 되기까지 시간이 더 걸리는 경우도 있습니다.

템플릿 테이블 스키마 변경

템플릿 테이블 스키마를 변경하면 이후에 생성되는 모든 테이블은 업데이트된 스키마를 사용합니다. 기존 테이블에 스트리밍 버퍼가 여전히 있는 경우를 제외하고 이전에 생성된 테이블은 영향을 받지 않습니다.

아직 스트리밍 버퍼가 있는 기존 테이블의 경우 하위 호환 가능한 방식으로 템플릿 테이블 스키마를 수정하면 이러한 활성 스트리밍 생성 테이블의 스키마도 업데이트됩니다. 그러나 역호환되지 않는 방식으로 템플릿 테이블 스키마를 수정하는 경우 기존 스키마를 사용하는 버퍼링된 데이터는 손실됩니다. 또한 현재 호환되지 않는 이전 스키마를 사용하는 기존에 생성된 테이블로 새 데이터를 스트리밍할 수 없습니다.

템플릿 테이블 스키마를 변경한 후 쿼리 생성 테이블에 새 데이터를 삽입하기 전에 변경 사항이 전파될 때까지 기다리세요. 새 필드 삽입 요청은 몇 분 이내에 성공적으로 수행됩니다. 새 필드를 쿼리하려면 최대 90분까지 기다려야 할 수 있습니다.

생성된 테이블의 스키마를 변경하려는 경우, 템플릿 테이블을 통한 스트리밍이 중단되고 생성된 테이블의 스트리밍 통계 섹션이 tables.get() 응답에 없는 경우(테이블에 버퍼링된 데이터가 없음을 의미) 외에는 스키마를 변경하지 마세요.

파티션을 나눈 테이블클러스터링된 테이블은 앞의 제한 사항이 적용되지 않으며, 권장되는 메커니즘입니다.

템플릿 테이블 세부정보

템플릿 서픽스 값
templateSuffix(또는 --template_suffix) 값은 문자(a-z, A-Z), 숫자(0-9), 밑줄(_)만 포함해야 합니다. 테이블 이름과 테이블 서픽스를 결합한 최대 길이는 1,024자입니다.
할당량

템플릿 테이블에는 스트리밍 할당량 제한이 적용됩니다. 프로젝트는 tables.insert API와 비슷하게 템플릿 테이블을 사용하여 초당 최대 10개의 테이블을 만들 수 있습니다. 이 할당량은 수정되는 테이블이 아닌 생성되는 테이블에만 적용됩니다.

애플리케이션이 테이블을 초당 10개 넘게 만들어야 할 경우 클러스터링된 테이블을 사용하는 것이 좋습니다. 예를 들어 단일 클러스터링 테이블의 키 열에 높은 카디널리티 테이블 ID를 배치할 수 있습니다.

라이브까지의 시간

생성된 테이블은 데이터 세트에서 만료 시간을 상속합니다. 일반적인 스트리밍 데이터와 마찬가지로 생성된 테이블을 즉시 복사할 수는 없습니다.

중복 삭제

중복 삭제는 대상 테이블에 대한 동일한 참조 간에만 발생합니다. 예를 들어 템플릿 테이블과 정규 insertAll 명령어를 모두 사용하여 생성된 테이블로 동시에 스트리밍하는 경우 템플릿 테이블과 정규 insertAll 명령어로 삽입된 행 간에는 중복 삭제가 발생하지 않습니다.

템플릿 테이블과 생성된 테이블은 뷰일 수 없습니다.

스트리밍 삽입 문제 해결

다음 섹션에서는 기존 스트리밍 API를 사용하여 BigQuery로 데이터를 스트리밍할 때 발생하는 오류를 해결하는 방법을 설명합니다. 스트리밍 삽입의 할당량 오류를 해결하는 방법에 대한 자세한 내용은 스트리밍 삽입 할당량 오류를 참조하세요.

실패 HTTP 응답 코드

네트워크 오류와 같은 실패 HTTP 응답 코드를 받는 경우 스트리밍 삽입이 성공적으로 수행되었는지 여부를 확인할 방법이 없습니다. 요청을 다시 전송하려고 시도하는 경우 테이블에 중복된 행이 발생할 수 있습니다. 테이블에서 중복이 발생하지 않도록 하려면 요청을 보낼 때 insertId 속성을 설정합니다. BigQuery는 중복 삭제에 insertId 속성을 사용합니다.

권한 오류, 잘못된 테이블 이름 오류 또는 할당량 초과 오류를 받는 경우 행이 삽입되지 않으며 전체 요청이 실패합니다.

성공 HTTP 응답 코드

BigQuery에서 부분적으로만 행 삽입에 성공하는 경우도 있으므로 성공 HTTP 응답 코드를 받더라도 응답의 insertErrors 속성을 확인하여 행 삽입이 성공적으로 수행되었는지 여부를 판단해야 합니다. 다음 시나리오 중 하나가 발생할 수 있습니다.

  • 모든 행이 삽입되었습니다. insertErrors 속성이 빈 목록인 경우 모든 행이 성공적으로 삽입된 것입니다.
  • 일부 행이 삽입되었습니다. 행에 스키마 불일치가 있는 경우를 제외하고 insertErrors 속성에 표시된 행은 삽입되지 않았고 다른 모든 행은 성공적으로 삽입되었습니다. errors 속성에는 성공하지 못한 행의 실패 이유에 대한 자세한 정보가 포함되어 있습니다. index 속성은 오류가 적용되는 요청의 0 기반 행 색인을 나타냅니다.
  • 성공적으로 삽입된 행이 없습니다. BigQuery에서 요청의 개별 행에 스키마 불일치가 발생하는 경우 아무런 행이 삽입되지 않으며 행에 스키마 불일치가 없더라도 각 행에 insertErrors 항목이 반환됩니다. 스키마 불일치가 없는 행의 경우 reason 속성이 stopped로 설정된 오류가 있으며 현재 상태 그대로 다시 보낼 수 있습니다. 실패한 행에는 스키마 불일치에 대한 자세한 정보가 포함됩니다. 각 BigQuery 데이터 유형에 지원되는 프로토콜 버퍼 유형에 대해 알아보려면 데이터 유형 변환을 참조하세요.

스트리밍 삽입의 메타데이터 오류

BigQuery의 스트리밍 API는 높은 삽입 비율에 맞게 고안되었으므로 스트리밍 시스템과 상호작용 시 기반 테이블 메타데이터 노출의 수정은 eventual consistency를 갖게 됩니다. 대부분의 경우 메타데이터 변경은 몇 분 이내에 전파되지만 이 시간 동안 API 응답은 비일관적인 테이블 상태를 반영할 수 있습니다.

일부 시나리오에는 다음이 포함됩니다.

  • 스키마 변경사항. 스트리밍 시스템은 스키마 변경을 즉각 적용하지 않을 수 있으므로 최근 스트리밍 삽입을 수신한 테이블의 스키마를 수정하는 경우 스키마 불일치 오류 응답이 발생할 수 있습니다.
  • 테이블 만들기/삭제. 존재하지 않는 테이블로 스트리밍하면 notFound 응답 변형이 반환됩니다. 응답으로 생성된 테이블은 후속 스트리밍 삽입에서 즉시 인식되지 않을 수 있습니다. 마찬가지로 테이블을 삭제하거나 다시 만들어도 스트리밍 삽입이 실제로 이전 테이블로 전달되는 기간이 발생할 수 있습니다. 스트리밍 삽입이 새 테이블에 존재하지 않을 수 있습니다.
  • 테이블 잘림. 마찬가지로 WRITE_TRUNCATE의 writeDisposition을 사용하는 쿼리 작업을 사용해서 테이블 데이터를 자르면 일관성 기간 동안 후속 삽입이 누락될 수 있습니다.

데이터 누락/사용 불가

스트리밍 삽입은 일시적으로 쓰기 최적화 스토리지에 위치하는데, 스트리밍 버퍼의 가용성 특성은 관리 스토리지와는 다릅니다. 테이블 복사 작업 및 tabledata.list와 같은 API 메서드 등 BigQuery의 특정 작업은 쓰기 최적화 스토리지와 상호작용하지 않습니다. 최근 스트리밍 데이터가 대상 테이블 또는 출력에 존재하지 않습니다.