將 Blob Storage 資料載入 BigQuery

您可以使用 Blob Storage 專用 BigQuery 資料移轉服務連接器,將資料從 Blob Storage 載入至 BigQuery。您可以使用 BigQuery 資料移轉服務,安排週期性移轉工作,將 Blob Storage 中的最新資料新增至 BigQuery。

事前準備

建立 Blob 儲存體資料移轉作業前,請先執行下列操作:

所需權限

如要建立 Blob Storage 資料移轉,您必須具備 bigquery.transfers.update 身分與存取權管理 (IAM) 權限。您也需要目標資料集的 bigquery.datasets.getbigquery.datasets.update 權限。

預先定義的 bigquery.admin IAM 角色包含建立 Blob 儲存資料移轉作業所需的權限。

如要進一步瞭解 BigQuery 身分與存取權管理,請參閱「使用身分與存取權管理功能控管存取權」。

如要確認您在 Blob Storage 中擁有正確的權限,以便啟用資料移轉功能,請參閱「共用存取權簽章 (SAS)」。

如果您想要為 Pub/Sub 設定移轉執行通知,必須具備 pubsub.topics.setIamPolicy 權限。您只想設定電子郵件通知,就不需要 Pub/Sub 權限。詳情請參閱「BigQuery 資料移轉服務執行通知」。

限制

Blob Storage 資料移轉作業有下列限制:

設定 Blob 儲存體資料移轉作業

選取下列選項之一:

主控台

  1. 前往 Google Cloud 控制台的「資料移轉」頁面。

    前往「資料移轉」

  2. 按一下 「建立移轉作業」

  3. 在「Create transfer」(建立轉移作業)頁面執行下列操作:

    • 在「Source type」(來源類型) 部分,「Source」(來源) 請選取「Azure Blob Storage」(Azure Blob 儲存體)

      移轉來源類型

    • 在「Transfer config name」(轉移設定名稱) 部分,「Display name」(顯示名稱) 請輸入資料移轉作業名稱。

    • 在「Schedule options」(排程選項) 部分,請執行下列操作:

      • 選取重複頻率。如果選取「Hours」(小時)、「Days」(天)、「Weeks」(週)或「Months」(月),必須一併指定頻率。您也可以選取「Custom」(自訂),指定重複頻率。如果選取「On-demand」(隨選),這項資料移轉作業會在您手動觸發後執行。
      • 視情況選取「Start now」(立即開始) 或「Start at set time」(在所設時間開始執行),並提供開始日期和執行時間。
    • 在「Destination settings」(目的地設定) 部分,「Dataset」(資料集) 請選取您為了儲存資料而建立的資料集。

    • 在「Data source details」(資料來源詳細資料) 部分執行下列操作:

      • 在「Destination table」(目的地資料表),輸入您為了在 BigQuery 儲存資料而建立的資料表名稱。目的地資料表名稱支援參數
      • 在「Azure storage account name」(Azure 儲存體帳戶名稱),輸入 Blob 儲存體帳戶名稱。
      • 在「Container name」(容器名稱),輸入 Blob 儲存體容器名稱。
      • 在「Data path」(資料路徑) 部分輸入路徑,篩選出要移轉的檔案。 查看示例
      • 在「SAS token」(SAS 權杖) 部分輸入 Azure SAS 權杖。
      • 在「File format」(檔案格式) 選取來源資料格式。
      • 在「Write disposition」(寫入配置) 部分,選取 WRITE_APPEND 可以陸續將新的資料附加至目的地資料表;選取 WRITE_TRUNCATE 則可在每次移轉資料時覆寫目的地資料表資料。「Write disposition」(寫入配置) 的預設值為 WRITE_APPEND

      如要進一步瞭解 BigQuery 資料移轉服務如何使用 WRITE_APPENDWRITE_TRUNCATE 擷取資料,請參閱 Azure Blob 移轉作業資料擷取的相關說明。如要進一步瞭解 writeDisposition 欄位,請參閱 JobConfigurationLoad

      資料來源詳細資料

    • 在「Transfer options」(移轉作業選項) 部分執行下列操作:

      • 在「Number of errors allowed」(允許的錯誤數量) 部分,輸入可以忽略的損壞記錄數量上限 (整數值),預設值為 0。
      • (選用步驟) 在「Decimal target types」(小數目標類型) 部分,輸入以半形逗號分隔的清單,內含來源資料內小數值可能轉換成的 SQL 資料類型。系統會依據下列條件,選取要轉換的 SQL 資料類型:
        • 系統會按照 NUMERICBIGNUMERICSTRING 的順序,選取指定清單中支援的有效位數和小數位數類型。
        • 如果清單中的資料類型都不支援有效位數和小數位數,則會選取指定清單中支援範圍最廣的資料類型。如果讀取來源資料時,值超過支援的範圍,就會擲回錯誤。
        • 資料類型 STRING 支援所有有效位數和小數位數值。
        • 如果將這個欄位留空,ORC 的預設資料類型為 NUMERIC,STRING,其他檔案格式則為 NUMERIC
        • 這個欄位不得含有重複的資料類型。
        • 您提供資料類型時採用的順序不會有影響。
    • 如果您選取的檔案格式為 CSV 或 JSON,請在「JSON, CSV」(JSON、CSV) 部分勾選「Ignore unknown values」(略過不明的值),接受所含值不符合結構定義的資料列。

    • 如果您選取的檔案格式為 CSV,請在「CSV」部分針對要載入的資料輸入額外的 CSV 選項

      CSV 選項

    • 在「Notification options」(通知選項) 部分,您可以選擇啟用電子郵件通知和 Pub/Sub 通知。

      • 啟用電子郵件通知之後,若移轉失敗,移轉作業管理員就會收到電子郵件通知。
      • 啟用 Pub/Sub 通知時,請選取要發布的主題名稱,或是點選「Create a topic」(建立主題) 來建立主題。
    • 如果使用 CMEK,請在「Advanced options」(進階選項) 部分選取「Customer-managed key」(客戶管理的金鑰)。畫面隨即會列出可用的 CMEK 供您選擇。如要瞭解 CMEK 如何與 BigQuery 資料移轉服務搭配運作,請參閱指定移轉作業加密金鑰的相關說明。

  4. 按一下 [儲存]

bq

使用 bq mk --transfer_config 指令建立 Blob 儲存體轉移作業:

bq mk \
  --transfer_config \
  --project_id=PROJECT_ID \
  --data_source=DATA_SOURCE \
  --display_name=DISPLAY_NAME \
  --target_dataset=DATASET \
  --destination_kms_key=DESTINATION_KEY \
  --params=PARAMETERS

更改下列內容:

  • PROJECT_ID:(選用) 包含目標資料集的專案 ID。如果未指定,系統會使用預設專案。
  • DATA_SOURCEazure_blob_storage
  • DISPLAY_NAME:資料移轉設定的顯示名稱。移轉作業名稱可以是任意值,日後需要修改移轉作業時能夠據此識別。
  • DATASET:資料移轉設定的目標資料集。
  • DESTINATION_KEY:(選用) Cloud KMS 金鑰資源 ID,例如 projects/project_name/locations/us/keyRings/key_ring_name/cryptoKeys/key_name
  • PARAMETERS:資料移轉設定的參數,以 JSON 格式列出。例如:--params={"param1":"value1", "param2":"value2"}。以下是 Blob 儲存體資料移轉作業的參數:
    • destination_table_name_template:必填。目標資料表的名稱。
    • storage_account:必填。Blob 儲存體帳戶名稱。
    • container:必填。Blob 儲存體容器名稱。
    • data_path:選用。篩選要移轉的檔案路徑。請參閱示例
    • sas_token:必填。Azure SAS 權杖。
    • file_format:選用。您要移轉的檔案類型:CSVJSONAVROPARQUETORC。預設值為 CSV
    • write_disposition:選用。選取 WRITE_APPEND 可將資料附加至目的地資料表,選取 WRITE_TRUNCATE 則可覆寫目的地資料表中的資料。預設值為 WRITE_APPEND
    • max_bad_records:選用。允許的損壞記錄數量。預設值為 0。
    • decimal_target_types:選用。以半形逗號分隔的清單,內含來源資料內小數值可能轉換成的 SQL 資料類型。如果未提供這個欄位,ORC 的預設資料類型為 NUMERIC,STRING,其他檔案格式則為 NUMERIC
    • ignore_unknown_values:選用,如果 file_format 不是 JSONCSV,則會略過。將其設為 true 可接受包含不符合結構定義的值的資料列。
    • field_delimiter:選用,僅在 file_formatCSV 時才會套用。分隔欄位的字元。預設值為 ,
    • skip_leading_rows:選用,僅在 file_formatCSV 時才會套用。指出不想匯入的標題列數量。預設值為 0。
    • allow_quoted_newlines:選用,僅在 file_formatCSV 時才會套用。指出是否允許在引用欄位中使用換行符號。
    • allow_jagged_rows:選用,僅在 file_formatCSV 時才會套用。指出是否接受缺少結尾選用欄的資料列。缺少的值會以 NULL 填入。

舉例來說,以下指令會建立名為 mytransfer 的 Blob Storage 資料移轉作業:

bq mk \
  --transfer_config \
  --data_source=azure_blob_storage \
  --display_name=mytransfer \
  --target_dataset=mydataset \
  --destination_kms_key=projects/myproject/locations/us/keyRings/mykeyring/cryptoKeys/key1
  --params={"destination_table_name_template":"mytable",
      "storage_account":"myaccount",
      "container":"mycontainer",
      "data_path":"myfolder/*.csv",
      "sas_token":"my_sas_token_value",
      "file_format":"CSV",
      "max_bad_records":"1",
      "ignore_unknown_values":"true",
      "field_delimiter":"|",
      "skip_leading_rows":"1",
      "allow_quoted_newlines":"true",
      "allow_jagged_rows":"false"}

API

請使用 projects.locations.transferConfigs.create 方法,並提供 TransferConfig 資源的執行個體。

Java

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 Java 設定說明進行操作。詳情請參閱 BigQuery Java API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to create azure blob storage transfer config.
public class CreateAzureBlobStorageTransfer {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    final String displayName = "MY_TRANSFER_DISPLAY_NAME";
    final String datasetId = "MY_DATASET_ID";
    String tableId = "MY_TABLE_ID";
    String storageAccount = "MY_AZURE_STORAGE_ACCOUNT_NAME";
    String containerName = "MY_AZURE_CONTAINER_NAME";
    String dataPath = "MY_AZURE_FILE_NAME_OR_PREFIX";
    String sasToken = "MY_AZURE_SAS_TOKEN";
    String fileFormat = "CSV";
    String fieldDelimiter = ",";
    String skipLeadingRows = "1";
    Map<String, Value> params = new HashMap<>();
    params.put(
        "destination_table_name_template", Value.newBuilder().setStringValue(tableId).build());
    params.put("storage_account", Value.newBuilder().setStringValue(storageAccount).build());
    params.put("container", Value.newBuilder().setStringValue(containerName).build());
    params.put("data_path", Value.newBuilder().setStringValue(dataPath).build());
    params.put("sas_token", Value.newBuilder().setStringValue(sasToken).build());
    params.put("file_format", Value.newBuilder().setStringValue(fileFormat).build());
    params.put("field_delimiter", Value.newBuilder().setStringValue(fieldDelimiter).build());
    params.put("skip_leading_rows", Value.newBuilder().setStringValue(skipLeadingRows).build());
    createAzureBlobStorageTransfer(projectId, displayName, datasetId, params);
  }

  public static void createAzureBlobStorageTransfer(
      String projectId, String displayName, String datasetId, Map<String, Value> params)
      throws IOException {
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName(displayName)
            .setDataSourceId("azure_blob_storage")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .build();
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (DataTransferServiceClient client = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = client.createTransferConfig(request);
      System.out.println("Azure Blob Storage transfer created successfully: " + config.getName());
    } catch (ApiException ex) {
      System.out.print("Azure Blob Storage transfer was not created." + ex.toString());
    }
  }
}

使用轉移作業指定加密金鑰

您可以指定客戶管理的加密金鑰 (CMEK),為轉移作業加密資料。您可以使用 CMEK 支援從 Azure Blob Storage 進行的轉移作業。

當您在移轉作業中指定 CMEK 時,BigQuery 資料移轉服務會將 CMEK 套用至任何擷取資料的磁碟上中繼快取,讓整個資料移轉工作流程符合 CMEK 規範。

如果轉移作業並非使用 CMEK 建立,您就無法更新現有轉移作業來新增 CMEK。舉例來說,您無法將原本預設加密的目的資料表變更為使用 CMEK 加密。反之,您也無法將 CMEK 加密目的地資料表變更為其他類型的加密。

如果移轉設定最初是使用 CMEK 加密功能建立,您可以更新移轉作業的 CMEK。更新移轉設定的 CMEK 後,BigQuery 資料移轉服務會在下次執行移轉作業時,將 CMEK 傳播至目的地資料表,並在移轉作業執行期間,將任何過期的 CMEK 替換為新的 CMEK。詳情請參閱「更新轉移作業」。

您也可以使用專案預設鍵。在指定含有移轉作業的專案預設鍵時,BigQuery 資料移轉服務會將該專案預設鍵用於任何新移轉設定的預設鍵。

排解轉移設定問題

如果您在設定資料移轉作業時遇到問題,請參閱Blob 儲存體轉移問題

後續步驟