批次匯入特徵值

批次匯入功能可讓您從有效資料來源大量匯入特徵值。在批次匯入要求中,您最多可以為實體類型匯入 100 個特徵的值。請注意,為避免發生衝突,每個實體類型只能執行一個批次匯入工作。

在批次匯入要求中,指定來源資料的位置,以及來源資料如何對應至 featurestore 中的特徵。由於每個批次匯入要求都只適用於單一實體類型,因此來源資料也必須是單一實體類型。

匯入成功後,後續的讀取作業即可使用特徵值。

匯入工作效能

Vertex AI 特徵儲存庫 (舊版) 提供高輸送量匯入功能,但最低延遲時間可能需要幾分鐘。對 Vertex AI 特徵儲存庫 (舊版) 的每項要求都會啟動工作來完成作業。即使只匯入一筆記錄,匯入工作也需要幾分鐘才能完成。

如要調整作業的執行方式,請變更下列兩個變數:

  • 特徵儲存庫線上供應節點數量。
  • 匯入工作使用的工作站數量。工作人員會處理資料,並將資料寫入 featurestore。

建議的工作站數量為每 10 個線上供應節點配備 1 個工作站。如果線上服務負載較低,可以提高上限。最多可以指定 100 個工作站。如需更多指引,請參閱「監控及相應調整資源,以最佳化批次匯入作業」。

如果線上服務叢集資源不足,匯入工作可能會失敗。如果匯入失敗,請在線上供應負載較低時重試匯入要求,或增加特徵商店的節點數量,然後重試要求。

如果特徵商店沒有線上商店 (零個線上服務節點),匯入工作只會寫入離線商店,且工作效能完全取決於匯入工作站數量。

資料一致性

如果在匯入期間修改來源資料,可能會導致資料不一致。請務必先完成所有來源資料修改,再開始匯入作業。此外,重複的特徵值可能會導致線上和批次要求之間提供不同的值。請確保每個實體 ID 和時間戳記組合都有一個特徵值。

如果匯入作業失敗,特徵商店可能只會取得部分資料,導致線上和批次服務要求傳回的值不一致。為避免發生這種不一致的情況,請再次嘗試相同的匯入要求,並等待要求順利完成。

空值和空陣列

匯入期間,Vertex AI 特徵儲存庫 (舊版) 會將空純量值或空陣列視為空值。包括 CSV 欄中的空白值。Vertex AI 特徵儲存庫 (舊版) 不支援非純量空值,例如陣列中的 null 值。

在線上提供和批次提供期間,Vertex AI 特徵儲存庫 (舊版) 會傳回特徵的最新非空值。如果沒有特徵的歷史值,Vertex AI 特徵儲存庫 (舊版) 會傳回 null

NaN 值

Vertex AI 特徵儲存庫 (舊版) 支援 DoubleDoubleArray 中的 NaN (非數字) 值。匯入期間,您可以在放送輸入 CSV 檔案中輸入 NaN,代表 NaN 值。在線上服務和批次服務期間,Vertex AI 特徵儲存庫 (舊版) 會針對 NaN 值傳回 NaN

批次匯入

將大量值匯入特徵儲存庫,供單一實體類型的一或多項特徵使用。

網路使用者介面

  1. 在 Google Cloud 控制台的 Vertex AI 專區,前往「Features」頁面。

    前往「Features」(功能) 頁面

  2. 從「Region」(區域) 下拉式清單中選取一個區域。
  3. 在特徵表格中,查看「實體類型」欄,找出包含要匯入值的特徵的實體類型。
  4. 按一下實體類型名稱。
  5. 在動作列中,按一下「Ingest values」(擷取值)
  6. 針對「資料來源」,選取下列其中一個選項:
    • Cloud Storage CSV 檔案:選取這個選項,即可從 Cloud Storage 匯入多個 CSV 檔案的資料。指定 CSV 檔案的路徑和名稱。如要指定其他檔案,請按一下「新增其他檔案」
    • Cloud Storage AVRO 檔案:選取這個選項,從 Cloud Storage 的 AVRO 檔案匯入資料。指定 AVRO 檔案的路徑和名稱。
    • BigQuery 資料表:選取這個選項,從 BigQuery 資料表或 BigQuery 檢視區塊匯入資料。瀏覽並選取要使用的資料表或檢視區塊,格式如下: PROJECT_ID.DATASET_ID.TABLE_ID
  7. 按一下「繼續」
  8. 在「將資料欄對應至特徵」中,指定來源資料中的哪些資料欄會對應至特徵商店中的實體和特徵。
    1. 指定來源資料中包含實體 ID 的資料欄名稱。
    2. 如要指定時間戳記,請在來源資料中指定時間戳記資料欄,或指定與匯入的所有特徵值相關聯的單一時間戳記。
    3. 在特徵清單中,輸入對應至各項特徵的來源資料欄名稱。根據預設,Vertex AI 特徵儲存庫 (舊版) 會假設特徵名稱和欄名相符。
  9. 按一下「擷取」

REST

如要匯入現有特徵的特徵值,請使用 featurestores.entityTypes.importFeatureValues 方法傳送 POST 要求。請注意,如果來源資料欄的名稱與目的地特徵 ID 不同,請加入 sourceField 參數。

使用任何要求資料之前,請先替換以下項目:

  • LOCATION_ID:建立特徵商店的區域。例如:us-central1
  • PROJECT_ID:您的專案 ID
  • FEATURESTORE_ID:特徵商店的 ID。
  • ENTITY_TYPE_ID:實體類型 ID。
  • ENTITY_SOURCE_COLUMN_ID:包含實體 ID 的來源資料欄 ID。
  • FEATURE_TIME_ID:來源資料欄的 ID,內含特徵值的特徵時間戳記。
  • FEATURE_ID:要匯入值的特徵商店中現有特徵的 ID。
  • FEATURE_SOURCE_COLUMN_ID:來源資料欄的 ID,內含實體的特徵值。
  • SOURCE_DATA_DETAILS:來源資料位置,也表示格式,例如 BigQuery 資料表或 BigQuery 檢視區塊的 "bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" }
  • WORKER_COUNT:用來將資料寫入 Feature Store 的工作人員人數。

HTTP 方法和網址:

POST https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

JSON 要求主體:

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

如要傳送要求,請選擇以下其中一個選項:

curl

將要求主體儲存在名為 request.json 的檔案中,然後執行下列指令:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

將要求主體儲存在名為 request.json 的檔案中,然後執行下列指令:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

畫面會顯示類似以下的輸出。您可以使用回應中的 OPERATION_ID 取得作業狀態

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Python

如要瞭解如何安裝或更新 Python 適用的 Vertex AI SDK,請參閱「安裝 Python 適用的 Vertex AI SDK」。 詳情請參閱 Python API 參考說明文件

import datetime
from typing import List, Union

from google.cloud import aiplatform


def import_feature_values_sample(
    project: str,
    location: str,
    entity_type_id: str,
    featurestore_id: str,
    feature_ids: List[str],
    feature_time: Union[str, datetime.datetime],
    gcs_source_uris: Union[str, List[str]],
    gcs_source_type: str,
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_entity_type.ingest_from_gcs(
        feature_ids=feature_ids,
        feature_time=feature_time,
        gcs_source_uris=gcs_source_uris,
        gcs_source_type=gcs_source_type,
    )

Java

在試用這個範例之前,請先按照Java使用用戶端程式庫的 Vertex AI 快速入門中的操作說明進行設定。 詳情請參閱 Vertex AI Java API 參考說明文件

如要向 Vertex AI 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.aiplatform.v1.AvroSource;
import com.google.cloud.aiplatform.v1.EntityTypeName;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceClient;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceSettings;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest.FeatureSpec;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ImportFeatureValuesSample {

  public static void main(String[] args)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "YOUR_PROJECT_ID";
    String featurestoreId = "YOUR_FEATURESTORE_ID";
    String entityTypeId = "YOUR_ENTITY_TYPE_ID";
    String entityIdField = "YOUR_ENTITY_FIELD_ID";
    String featureTimeField = "YOUR_FEATURE_TIME_FIELD";
    String gcsSourceUri = "YOUR_GCS_SOURCE_URI";
    int workerCount = 2;
    String location = "us-central1";
    String endpoint = "us-central1-aiplatform.googleapis.com:443";
    int timeout = 300;
    importFeatureValuesSample(
        project,
        featurestoreId,
        entityTypeId,
        gcsSourceUri,
        entityIdField,
        featureTimeField,
        workerCount,
        location,
        endpoint,
        timeout);
  }

  static void importFeatureValuesSample(
      String project,
      String featurestoreId,
      String entityTypeId,
      String gcsSourceUri,
      String entityIdField,
      String featureTimeField,
      int workerCount,
      String location,
      String endpoint,
      int timeout)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    FeaturestoreServiceSettings featurestoreServiceSettings =
        FeaturestoreServiceSettings.newBuilder().setEndpoint(endpoint).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (FeaturestoreServiceClient featurestoreServiceClient =
        FeaturestoreServiceClient.create(featurestoreServiceSettings)) {
      List<FeatureSpec> featureSpecs = new ArrayList<>();

      featureSpecs.add(FeatureSpec.newBuilder().setId("title").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("genres").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("average_rating").build());
      ImportFeatureValuesRequest importFeatureValuesRequest =
          ImportFeatureValuesRequest.newBuilder()
              .setEntityType(
                  EntityTypeName.of(project, location, featurestoreId, entityTypeId).toString())
              .setEntityIdField(entityIdField)
              .setFeatureTimeField(featureTimeField)
              .addAllFeatureSpecs(featureSpecs)
              .setWorkerCount(workerCount)
              .setAvroSource(
                  AvroSource.newBuilder()
                      .setGcsSource(GcsSource.newBuilder().addUris(gcsSourceUri)))
              .build();
      OperationFuture<ImportFeatureValuesResponse, ImportFeatureValuesOperationMetadata>
          importFeatureValuesFuture =
              featurestoreServiceClient.importFeatureValuesAsync(importFeatureValuesRequest);
      System.out.format(
          "Operation name: %s%n", importFeatureValuesFuture.getInitialFuture().get().getName());
      System.out.println("Waiting for operation to finish...");
      ImportFeatureValuesResponse importFeatureValuesResponse =
          importFeatureValuesFuture.get(timeout, TimeUnit.SECONDS);
      System.out.println("Import Feature Values Response");
      System.out.println(importFeatureValuesResponse);
      featurestoreServiceClient.close();
    }
  }
}

Node.js

在試用這個範例之前,請先按照Node.js使用用戶端程式庫的 Vertex AI 快速入門中的操作說明進行設定。 詳情請參閱 Vertex AI Node.js API 參考說明文件

如要向 Vertex AI 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

/**
 * TODO(developer): Uncomment these variables before running the sample.\
 * (Not necessary if passing values as arguments)
 */

// const project = 'YOUR_PROJECT_ID';
// const featurestoreId = 'YOUR_FEATURESTORE_ID';
// const entityTypeId = 'YOUR_ENTITY_TYPE_ID';
// const avroGcsUri = 'AVRO_FILE_IN_THE_GCS_URI';
// const entityIdField = 'ENTITY_ID_FIELD_IN_AVRO';
// const featureTimeField = 'TIMESTAMP_FIELD_IN_AVRO';
// const workerCount = <NO_OF_WORKERS_FOR_INGESTION_JOB>;
// const location = 'YOUR_PROJECT_LOCATION';
// const apiEndpoint = 'YOUR_API_ENDPOINT';
// const timeout = <TIMEOUT_IN_MILLI_SECONDS>;

// Imports the Google Cloud Featurestore Service Client library
const {FeaturestoreServiceClient} = require('@google-cloud/aiplatform').v1;

// Specifies the location of the api endpoint
const clientOptions = {
  apiEndpoint: apiEndpoint,
};

// Instantiates a client
const featurestoreServiceClient = new FeaturestoreServiceClient(
  clientOptions
);

async function importFeatureValues() {
  // Configure the entityType resource
  const entityType = `projects/${project}/locations/${location}/featurestores/${featurestoreId}/entityTypes/${entityTypeId}`;

  const avroSource = {
    gcsSource: {
      uris: [avroGcsUri],
    },
  };

  const featureSpecs = [{id: 'age'}, {id: 'gender'}, {id: 'liked_genres'}];

  const request = {
    entityType: entityType,
    avroSource: avroSource,
    entityIdField: entityIdField,
    featureSpecs: featureSpecs,
    featureTimeField: featureTimeField,
    workerCount: Number(workerCount),
  };

  // Import Feature Values Request
  const [operation] = await featurestoreServiceClient.importFeatureValues(
    request,
    {timeout: Number(timeout)}
  );
  const [response] = await operation.promise();

  console.log('Import feature values response');
  console.log('Raw response:');
  console.log(JSON.stringify(response, null, 2));
}
importFeatureValues();

查看匯入工作

您可以使用 Google Cloud 控制台,查看Google Cloud 專案中的批次匯入工作。

網路使用者介面

  1. 在 Google Cloud 控制台的 Vertex AI 專區,前往「Features」頁面。

    前往「Features」(功能) 頁面

  2. 從「Region」(區域) 下拉式清單中選取一個區域。
  3. 在動作列中,按一下「查看擷取作業」,列出所有特徵商店的匯入作業。
  4. 按一下匯入工作的 ID,即可查看詳細資料,例如資料來源、匯入的實體數量,以及匯入的特徵值數量。

覆寫 featurestore 中的現有資料

如果現有特徵值和重新匯入的值具有相同時間戳記,您可以重新匯入值來覆寫現有特徵值。您不需要先刪除現有的特徵值。舉例來說,您可能依賴最近變更的基礎來源資料。如要讓特徵商店與基礎資料保持一致,請再次匯入特徵值。如果時間戳記不相符,系統會將匯入的值視為不重複,且舊值會繼續存在 (不會遭到覆寫)。

為確保線上和批次服務要求之間的一致性,請等待匯入作業完成,再提出任何服務要求。

補充歷來資料

如果您要回填資料 (即匯入過去的特徵值),請為匯入工作停用線上放送。線上供應服務僅用於供應最新的特徵值,不包括回填。停用線上服務很有用,因為這樣可以消除線上服務節點的任何負載,並提高匯入工作的輸送量,進而縮短完成時間。

使用 API 或用戶端程式庫時,您可以停用匯入工作的線上服務。詳情請參閱 importFeatureValue 方法disableOnlineServing 欄位。

後續步驟