获取 Gemini 的批量预测结果

通过批量预测,您可以在单个批量请求中发送大量多模态提示。

如需详细了解批处理工作流以及如何设置输入数据的格式,请参阅获取 Gemini 的批量预测结果

支持的模型

模型 版本
Gemini 1.5 Flash gemini-1.5-flash-002
gemini-1.5-flash-001
Gemini 1.5 Pro gemini-1.5-pro-002
gemini-1.5-pro-001
Gemini 1.0 Pro gemini-1.0-pro-001
gemini-1.0-pro-002

示例语法

以下语法展示了如何使用 curl 命令发送批量预测 API 请求。此示例仅适用于 BigQuery 存储空间。

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://${LOCATION}-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/${LOCATION}/batchPredictionJobs \
  -d '{
      "displayName": "...",
      "model": "publishers/google/models/${MODEL_ID}",
      "inputConfig": {
        "instancesFormat": "bigquery",
        "bigquerySource": {
          "inputUri" : "..."
        }
      },
      "outputConfig": {
        "predictionsFormat": "bigquery",
        "bigqueryDestination": {
          "outputUri": "..."
        }
      }
  }'

参数

如需了解实现详情,请参阅示例

正文请求

参数

displayName

您为作业选择的名称。

model

用于批量预测的模型。

inputConfig

数据格式。对于 Gemini 批量预测,支持 Cloud Storage 和 BigQuery 输入源。

outputConfig

用于确定模型输出位置的输出配置。 支持 Cloud Storage 和 BigQuery 输出位置。

inputConfig

参数

instancesFormat

提示输入格式。对于 Cloud Storage,请使用 jsonl;对于 BigQuery,请使用 bigquery

gcsSource.uris

输入来源的 URI。这是 JSONL 文件的 Cloud Storage 位置,采用 gs://bucketname/path/to/file.jsonl 格式。

bigquerySource.inputUri

输入来源的 URI。这是 BigQuery 表 URI,格式为 bq://project_id.dataset.table。输入 BigQuery 数据集的区域必须与 Vertex AI 批量预测作业的区域相同。

outputConfig

参数

predictionsFormat

预测结果的输出格式。使用 bigquery

gcsDestination.outputUriPrefix

Cloud Storage 存储桶和目录位置,格式为 gs://mybucket/path/to/output

bigqueryDestination.outputUri

目标输出表的 BigQuery URI,格式为 bq://project_id.dataset.table。如果该表尚不存在,则系统会为您创建。输出 BigQuery 数据集的区域必须与 Vertex AI 批量预测作业的区域相同。

示例

请求批量响应

多模态模型的批量请求接受 Cloud Storage 存储空间和 BigQuery 存储来源。如需了解详情,请参阅以下内容:

批量生成任务可能需要一些时间才能完成,具体取决于提交的输入数据项的数量。

REST

如需创建批量预测作业,请使用 projects.locations.batchPredictionJobs.create 方法。

Cloud Storage 输入

在使用任何请求数据之前,请先进行以下替换:

  • LOCATION:支持 Gemini 模型的区域。
  • PROJECT_ID:您的项目 ID
  • INPUT_URI:JSONL 批量预测输入(例如 gs://bucketname/path/to/file.jsonl)的 Cloud Storage 位置。
  • OUTPUT_FORMAT:如需输出到 BigQuery 表,请指定 bigquery。如需输出到 Cloud Storage 存储桶,请指定 jsonl
  • DESTINATION:对于 BigQuery,请指定 bigqueryDestination。对于 Cloud Storage,请指定 gcsDestination
  • OUTPUT_URI_FIELD_NAME:对于 BigQuery,请指定 outputUri。对于 Cloud Storage,请指定 outputUriPrefix
  • OUTPUT_URI:对于 BigQuery,请指定表位置,例如 bq://myproject.mydataset.output_result。输出 BigQuery 数据集的区域必须与 Vertex AI 批量预测作业的区域相同。 对于 Cloud Storage,请指定存储桶和目录位置,例如 gs://mybucket/path/to/output

请求 JSON 正文:

{
  "displayName": "my-cloud-storage-batch-prediction-job",
  "model": "publishers/google/models/gemini-1.5-flash-002",
  "inputConfig": {
    "instancesFormat": "jsonl",
    "gcsSource": {
      "uris" : "INPUT_URI"
    }
  },
  "outputConfig": {
    "predictionsFormat": "OUTPUT_FORMAT",
    "DESTINATION": {
      "OUTPUT_URI_FIELD_NAME": "OUTPUT_URI"
    }
  }
}

如需发送请求,请选择以下方式之一:

curl

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs"

PowerShell

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs" | Select-Object -Expand Content

您应该收到类似以下内容的 JSON 响应。

BigQuery 输入

在使用任何请求数据之前,请先进行以下替换:

  • LOCATION:支持 Gemini 模型的区域。
  • PROJECT_ID:您的项目 ID
  • INPUT_URI:批量预测输入所在的 BigQuery 表,例如 bq://myproject.mydataset.input_table。不支持多区域数据集。
  • OUTPUT_FORMAT:如需输出到 BigQuery 表,请指定 bigquery。如需输出到 Cloud Storage 存储桶,请指定 jsonl
  • DESTINATION:对于 BigQuery,请指定 bigqueryDestination。对于 Cloud Storage,请指定 gcsDestination
  • OUTPUT_URI_FIELD_NAME:对于 BigQuery,请指定 outputUri。对于 Cloud Storage,请指定 outputUriPrefix
  • OUTPUT_URI:对于 BigQuery,请指定表位置,例如 bq://myproject.mydataset.output_result。输出 BigQuery 数据集的区域必须与 Vertex AI 批量预测作业的区域相同。 对于 Cloud Storage,请指定存储桶和目录位置,例如 gs://mybucket/path/to/output

请求 JSON 正文:

{
  "displayName": "my-bigquery-batch-prediction-job",
  "model": "publishers/google/models/gemini-1.5-flash-002",
  "inputConfig": {
    "instancesFormat": "bigquery",
    "bigquerySource":{
      "inputUri" : "INPUT_URI"
    }
  },
  "outputConfig": {
    "predictionsFormat": "OUTPUT_FORMAT",
    "DESTINATION": {
      "OUTPUT_URI_FIELD_NAME": "OUTPUT_URI"
    }
  }
}

如需发送请求,请选择以下方式之一:

curl

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs"

PowerShell

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs" | Select-Object -Expand Content

您应该收到类似以下内容的 JSON 响应。

响应包含批量作业的唯一标识符。您可以使用 BATCH_JOB_ID 轮询批量作业的状态,直到作业 stateJOB_STATE_SUCCEEDED。 例如:

curl -X GET \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
https://us-central1-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/us-central1/batchPredictionJobs/BATCH_JOB_ID

Python

如需了解如何安装或更新 Vertex AI SDK for Python,请参阅安装 Vertex AI SDK for Python。 如需了解详情,请参阅 Python API 参考文档

Cloud Storage 输入

import time
import vertexai

from vertexai.batch_prediction import BatchPredictionJob

# TODO(developer): Update and un-comment below line
# PROJECT_ID = "your-project-id"

# Initialize vertexai
vertexai.init(project=PROJECT_ID, location="us-central1")

input_uri = "gs://cloud-samples-data/batch/prompt_for_batch_gemini_predict.jsonl"

# Submit a batch prediction job with Gemini model
batch_prediction_job = BatchPredictionJob.submit(
    source_model="gemini-1.5-flash-002",
    input_dataset=input_uri,
    output_uri_prefix=output_uri,
)

# Check job status
print(f"Job resource name: {batch_prediction_job.resource_name}")
print(f"Model resource name with the job: {batch_prediction_job.model_name}")
print(f"Job state: {batch_prediction_job.state.name}")

# Refresh the job until complete
while not batch_prediction_job.has_ended:
    time.sleep(5)
    batch_prediction_job.refresh()

# Check if the job succeeds
if batch_prediction_job.has_succeeded:
    print("Job succeeded!")
else:
    print(f"Job failed: {batch_prediction_job.error}")

# Check the location of the output
print(f"Job output location: {batch_prediction_job.output_location}")

# Example response:
#  Job output location: gs://your-bucket/gen-ai-batch-prediction/prediction-model-year-month-day-hour:minute:second.12345

BigQuery 输入

import time
import vertexai

from vertexai.batch_prediction import BatchPredictionJob

# TODO(developer): Update and un-comment below line
# PROJECT_ID = "your-project-id"

# Initialize vertexai
vertexai.init(project=PROJECT_ID, location="us-central1")

input_uri = "bq://storage-samples.generative_ai.batch_requests_for_multimodal_input"

# Submit a batch prediction job with Gemini model
batch_prediction_job = BatchPredictionJob.submit(
    source_model="gemini-1.5-flash-002",
    input_dataset=input_uri,
    output_uri_prefix=output_uri,
)

# Check job status
print(f"Job resource name: {batch_prediction_job.resource_name}")
print(f"Model resource name with the job: {batch_prediction_job.model_name}")
print(f"Job state: {batch_prediction_job.state.name}")

# Refresh the job until complete
while not batch_prediction_job.has_ended:
    time.sleep(5)
    batch_prediction_job.refresh()

# Check if the job succeeds
if batch_prediction_job.has_succeeded:
    print("Job succeeded!")
else:
    print(f"Job failed: {batch_prediction_job.error}")

# Check the location of the output
print(f"Job output location: {batch_prediction_job.output_location}")

# Example response:
#  Job output location: bq://Project-ID/gen-ai-batch-prediction/predictions-model-year-month-day-hour:minute:second.12345

Node.js

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Node.js API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

Cloud Storage 输入

// Import the aiplatform library
const aiplatformLib = require('@google-cloud/aiplatform');
const aiplatform = aiplatformLib.protos.google.cloud.aiplatform.v1;

/**
 * TODO(developer):  Uncomment/update these variables before running the sample.
 */
// projectId = 'YOUR_PROJECT_ID';
// URI of the output folder in Google Cloud Storage.
// E.g. "gs://[BUCKET]/[OUTPUT]"
// outputUri = 'gs://my-bucket';

// URI of the input file in Google Cloud Storage.
// E.g. "gs://[BUCKET]/[DATASET].jsonl"
// Or try:
// "gs://cloud-samples-data/generative-ai/batch/gemini_multimodal_batch_predict.jsonl"
// for a batch prediction that uses audio, video, and an image.
const inputUri =
  'gs://cloud-samples-data/generative-ai/batch/batch_requests_for_multimodal_input.jsonl';
const location = 'us-central1';
const parent = `projects/${projectId}/locations/${location}`;
const modelName = `${parent}/publishers/google/models/gemini-1.5-flash-002`;

// Specify the location of the api endpoint.
const clientOptions = {
  apiEndpoint: `${location}-aiplatform.googleapis.com`,
};

// Instantiate the client.
const jobServiceClient = new aiplatformLib.JobServiceClient(clientOptions);

// Create a Gemini batch prediction job using Google Cloud Storage input and output buckets.
async function create_batch_prediction_gemini_gcs() {
  const gcsSource = new aiplatform.GcsSource({
    uris: [inputUri],
  });

  const inputConfig = new aiplatform.BatchPredictionJob.InputConfig({
    gcsSource: gcsSource,
    instancesFormat: 'jsonl',
  });

  const gcsDestination = new aiplatform.GcsDestination({
    outputUriPrefix: outputUri,
  });

  const outputConfig = new aiplatform.BatchPredictionJob.OutputConfig({
    gcsDestination: gcsDestination,
    predictionsFormat: 'jsonl',
  });

  const batchPredictionJob = new aiplatform.BatchPredictionJob({
    displayName: 'Batch predict with Gemini - GCS',
    model: modelName,
    inputConfig: inputConfig,
    outputConfig: outputConfig,
  });

  const request = {
    parent: parent,
    batchPredictionJob,
  };

  // Create batch prediction job request
  const [response] = await jobServiceClient.createBatchPredictionJob(request);
  console.log('Response name: ', response.name);
  // Example response:
  // Response name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
}

await create_batch_prediction_gemini_gcs();

BigQuery 输入

// Import the aiplatform library
const aiplatformLib = require('@google-cloud/aiplatform');
const aiplatform = aiplatformLib.protos.google.cloud.aiplatform.v1;

/**
 * TODO(developer):  Uncomment/update these variables before running the sample.
 */
// projectId = 'YOUR_PROJECT_ID';
// URI of the output BigQuery table.
// E.g. "bq://[PROJECT].[DATASET].[TABLE]"
// outputUri = 'bq://projectid.dataset.table';

// URI of the multimodal input BigQuery table.
// E.g. "bq://[PROJECT].[DATASET].[TABLE]"
const inputUri =
  'bq://storage-samples.generative_ai.batch_requests_for_multimodal_input';
const location = 'us-central1';
const parent = `projects/${projectId}/locations/${location}`;
const modelName = `${parent}/publishers/google/models/gemini-1.5-flash-002`;

// Specify the location of the api endpoint.
const clientOptions = {
  apiEndpoint: `${location}-aiplatform.googleapis.com`,
};

// Instantiate the client.
const jobServiceClient = new aiplatformLib.JobServiceClient(clientOptions);

// Create a Gemini batch prediction job using BigQuery input and output datasets.
async function create_batch_prediction_gemini_bq() {
  const bqSource = new aiplatform.BigQuerySource({
    inputUri: inputUri,
  });

  const inputConfig = new aiplatform.BatchPredictionJob.InputConfig({
    bigquerySource: bqSource,
    instancesFormat: 'bigquery',
  });

  const bqDestination = new aiplatform.BigQueryDestination({
    outputUri: outputUri,
  });

  const outputConfig = new aiplatform.BatchPredictionJob.OutputConfig({
    bigqueryDestination: bqDestination,
    predictionsFormat: 'bigquery',
  });

  const batchPredictionJob = new aiplatform.BatchPredictionJob({
    displayName: 'Batch predict with Gemini - BigQuery',
    model: modelName, // Add model parameters per request in the input BigQuery table.
    inputConfig: inputConfig,
    outputConfig: outputConfig,
  });

  const request = {
    parent: parent,
    batchPredictionJob,
  };

  // Create batch prediction job request
  const [response] = await jobServiceClient.createBatchPredictionJob(request);
  console.log('Response name: ', response.name);
  // Example response:
  // Response name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
}

await create_batch_prediction_gemini_bq();

Java

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Java 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Java API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

Cloud Storage 输入

import com.google.cloud.aiplatform.v1.BatchPredictionJob;
import com.google.cloud.aiplatform.v1.GcsDestination;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.JobServiceClient;
import com.google.cloud.aiplatform.v1.JobServiceSettings;
import com.google.cloud.aiplatform.v1.LocationName;
import java.io.IOException;

public class CreateBatchPredictionGeminiJobSample {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Update these variables before running the sample.
    String project = "PROJECT_ID";
    String gcsDestinationOutputUriPrefix = "gs://MY_BUCKET/";

    createBatchPredictionGeminiJobSample(project, gcsDestinationOutputUriPrefix);
  }

  // Create a batch prediction job using a JSONL input file and output URI, both in Cloud
  // Storage.
  public static BatchPredictionJob createBatchPredictionGeminiJobSample(
      String project, String gcsDestinationOutputUriPrefix) throws IOException {
    String location = "us-central1";
    JobServiceSettings settings =
        JobServiceSettings.newBuilder()
            .setEndpoint(String.format("%s-aiplatform.googleapis.com:443", location))
            .build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (JobServiceClient client = JobServiceClient.create(settings)) {
      GcsSource gcsSource =
          GcsSource.newBuilder()
              .addUris(
                  "gs://cloud-samples-data/generative-ai/batch/"
                      + "batch_requests_for_multimodal_input.jsonl")
              // Or try
              // "gs://cloud-samples-data/generative-ai/batch/gemini_multimodal_batch_predict.jsonl"
              // for a batch prediction that uses audio, video, and an image.
              .build();
      BatchPredictionJob.InputConfig inputConfig =
          BatchPredictionJob.InputConfig.newBuilder()
              .setInstancesFormat("jsonl")
              .setGcsSource(gcsSource)
              .build();
      GcsDestination gcsDestination =
          GcsDestination.newBuilder().setOutputUriPrefix(gcsDestinationOutputUriPrefix).build();
      BatchPredictionJob.OutputConfig outputConfig =
          BatchPredictionJob.OutputConfig.newBuilder()
              .setPredictionsFormat("jsonl")
              .setGcsDestination(gcsDestination)
              .build();
      String modelName =
          String.format(
              "projects/%s/locations/%s/publishers/google/models/%s",
              project, location, "gemini-1.5-flash-002");

      BatchPredictionJob batchPredictionJob =
          BatchPredictionJob.newBuilder()
              .setDisplayName("my-display-name")
              .setModel(modelName) // Add model parameters per request in the input jsonl file.
              .setInputConfig(inputConfig)
              .setOutputConfig(outputConfig)
              .build();

      LocationName parent = LocationName.of(project, location);
      BatchPredictionJob response = client.createBatchPredictionJob(parent, batchPredictionJob);
      System.out.format("\tName: %s\n", response.getName());
      // Example response:
      //   Name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
      return response;
    }
  }
}

BigQuery 输入

import com.google.cloud.aiplatform.v1.BatchPredictionJob;
import com.google.cloud.aiplatform.v1.BigQueryDestination;
import com.google.cloud.aiplatform.v1.BigQuerySource;
import com.google.cloud.aiplatform.v1.JobServiceClient;
import com.google.cloud.aiplatform.v1.JobServiceSettings;
import com.google.cloud.aiplatform.v1.LocationName;
import java.io.IOException;

public class CreateBatchPredictionGeminiBigqueryJobSample {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Update these variables before running the sample.
    String project = "PROJECT_ID";
    String bigqueryDestinationOutputUri = "bq://PROJECT_ID.MY_DATASET.MY_TABLE";

    createBatchPredictionGeminiBigqueryJobSample(project, bigqueryDestinationOutputUri);
  }

  // Create a batch prediction job using BigQuery input and output datasets.
  public static BatchPredictionJob createBatchPredictionGeminiBigqueryJobSample(
      String project, String bigqueryDestinationOutputUri) throws IOException {
    String location = "us-central1";
    JobServiceSettings settings =
        JobServiceSettings.newBuilder()
            .setEndpoint(String.format("%s-aiplatform.googleapis.com:443", location))
            .build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (JobServiceClient client = JobServiceClient.create(settings)) {
      BigQuerySource bigquerySource =
          BigQuerySource.newBuilder()
              .setInputUri("bq://storage-samples.generative_ai.batch_requests_for_multimodal_input")
              .build();
      BatchPredictionJob.InputConfig inputConfig =
          BatchPredictionJob.InputConfig.newBuilder()
              .setInstancesFormat("bigquery")
              .setBigquerySource(bigquerySource)
              .build();
      BigQueryDestination bigqueryDestination =
          BigQueryDestination.newBuilder().setOutputUri(bigqueryDestinationOutputUri).build();
      BatchPredictionJob.OutputConfig outputConfig =
          BatchPredictionJob.OutputConfig.newBuilder()
              .setPredictionsFormat("bigquery")
              .setBigqueryDestination(bigqueryDestination)
              .build();
      String modelName =
          String.format(
              "projects/%s/locations/%s/publishers/google/models/%s",
              project, location, "gemini-1.5-flash-002");

      BatchPredictionJob batchPredictionJob =
          BatchPredictionJob.newBuilder()
              .setDisplayName("my-display-name")
              .setModel(modelName) // Add model parameters per request in the input BigQuery table.
              .setInputConfig(inputConfig)
              .setOutputConfig(outputConfig)
              .build();

      LocationName parent = LocationName.of(project, location);
      BatchPredictionJob response = client.createBatchPredictionJob(parent, batchPredictionJob);
      System.out.format("\tName: %s\n", response.getName());
      // Example response:
      //   Name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
      return response;
    }
  }
}

Go

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Go 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Go API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

Cloud Storage 输入

import (
	"context"
	"fmt"
	"io"
	"time"

	aiplatform "cloud.google.com/go/aiplatform/apiv1"
	aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"

	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/structpb"
)

// batchPredictGCS submits a batch prediction job using GCS data source as its input
func batchPredictGCS(w io.Writer, projectID, location string, inputURIs []string, outputURI string) error {
	// location := "us-central1"
	// inputURIs := []string{"gs://cloud-samples-data/batch/prompt_for_batch_gemini_predict.jsonl"}
	// outputURI := "gs://<cloud-bucket-name>/<prefix-name>"
	modelName := "gemini-1.5-pro-002"
	jobName := "batch-predict-gcs-test-001"

	ctx := context.Background()
	apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location)
	client, err := aiplatform.NewJobClient(ctx, option.WithEndpoint(apiEndpoint))
	if err != nil {
		return fmt.Errorf("unable to create aiplatform client: %w", err)
	}
	defer client.Close()

	modelParameters, err := structpb.NewValue(map[string]interface{}{
		"temperature":     0.2,
		"maxOutputTokens": 200,
	})
	if err != nil {
		return fmt.Errorf("unable to convert model parameters to protobuf value: %w", err)
	}

	req := &aiplatformpb.CreateBatchPredictionJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location),
		BatchPredictionJob: &aiplatformpb.BatchPredictionJob{
			DisplayName:     jobName,
			Model:           fmt.Sprintf("publishers/google/models/%s", modelName),
			ModelParameters: modelParameters,
			// Check the API reference for `BatchPredictionJob` for supported input and output formats:
			// https://cloud.google.com/vertex-ai/docs/reference/rpc/google.cloud.aiplatform.v1#google.cloud.aiplatform.v1.BatchPredictionJob
			InputConfig: &aiplatformpb.BatchPredictionJob_InputConfig{
				Source: &aiplatformpb.BatchPredictionJob_InputConfig_GcsSource{
					GcsSource: &aiplatformpb.GcsSource{
						Uris: inputURIs,
					},
				},
				InstancesFormat: "jsonl",
			},
			OutputConfig: &aiplatformpb.BatchPredictionJob_OutputConfig{
				Destination: &aiplatformpb.BatchPredictionJob_OutputConfig_GcsDestination{
					GcsDestination: &aiplatformpb.GcsDestination{
						OutputUriPrefix: outputURI,
					},
				},
				PredictionsFormat: "jsonl",
			},
		},
	}

	job, err := client.CreateBatchPredictionJob(ctx, req)
	if err != nil {
		return err
	}
	fullJobId := job.GetName()
	fmt.Fprintf(w, "submitted batch predict job for model %q\n", job.GetModel())
	fmt.Fprintf(w, "job id: %q\n", fullJobId)
	fmt.Fprintf(w, "job state: %s\n", job.GetState())
	// Example response:
	// submitted batch predict job for model "publishers/google/models/gemini-1.5-pro-002"
	// job id: "projects/.../locations/.../batchPredictionJobs/1234567890000000000"
	// job state: JOB_STATE_PENDING

	for {
		time.Sleep(5 * time.Second)

		job, err := client.GetBatchPredictionJob(ctx, &aiplatformpb.GetBatchPredictionJobRequest{
			Name: fullJobId,
		})
		if err != nil {
			return fmt.Errorf("error: couldn't get updated job state: %w", err)
		}

		if job.GetEndTime() != nil {
			fmt.Fprintf(w, "batch predict job finished with state %s\n", job.GetState())
			break
		} else {
			fmt.Fprintf(w, "batch predict job is running... job state is %s\n", job.GetState())
		}
	}

	return nil
}

BigQuery 输入

import (
	"context"
	"fmt"
	"io"
	"time"

	aiplatform "cloud.google.com/go/aiplatform/apiv1"
	aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"

	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/structpb"
)

// batchPredictBQ submits a batch prediction job using BigQuery data source as its input
func batchPredictBQ(w io.Writer, projectID, location string, inputURI string, outputURI string) error {
	// location  := "us-central1"
	// inputURI  := "bq://storage-samples.generative_ai.batch_requests_for_multimodal_input"
	// outputURI := "bq://<cloud-project-name>.<dataset-name>.<table-name>"
	modelName := "gemini-1.5-pro-002"
	jobName := "batch-predict-bq-test-001"

	ctx := context.Background()
	apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location)
	client, err := aiplatform.NewJobClient(ctx, option.WithEndpoint(apiEndpoint))
	if err != nil {
		return fmt.Errorf("unable to create aiplatform client: %w", err)
	}
	defer client.Close()

	modelParameters, err := structpb.NewValue(map[string]interface{}{
		"temperature":     0.2,
		"maxOutputTokens": 200,
	})
	if err != nil {
		return fmt.Errorf("unable to convert model parameters to protobuf value: %w", err)
	}

	req := &aiplatformpb.CreateBatchPredictionJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location),
		BatchPredictionJob: &aiplatformpb.BatchPredictionJob{
			DisplayName:     jobName,
			Model:           fmt.Sprintf("publishers/google/models/%s", modelName),
			ModelParameters: modelParameters,
			// Check the API reference for `BatchPredictionJob` for supported input and output formats:
			// https://cloud.google.com/vertex-ai/docs/reference/rpc/google.cloud.aiplatform.v1#google.cloud.aiplatform.v1.BatchPredictionJob
			InputConfig: &aiplatformpb.BatchPredictionJob_InputConfig{
				Source: &aiplatformpb.BatchPredictionJob_InputConfig_BigquerySource{
					BigquerySource: &aiplatformpb.BigQuerySource{
						InputUri: inputURI,
					},
				},
				InstancesFormat: "bigquery",
			},

			OutputConfig: &aiplatformpb.BatchPredictionJob_OutputConfig{
				Destination: &aiplatformpb.BatchPredictionJob_OutputConfig_BigqueryDestination{
					BigqueryDestination: &aiplatformpb.BigQueryDestination{
						OutputUri: outputURI,
					},
				},
				PredictionsFormat: "bigquery",
			},
		},
	}

	job, err := client.CreateBatchPredictionJob(ctx, req)
	if err != nil {
		return err
	}
	fullJobId := job.GetName()
	fmt.Fprintf(w, "submitted batch predict job for model %q\n", job.GetModel())
	fmt.Fprintf(w, "job id: %q\n", fullJobId)
	fmt.Fprintf(w, "job state: %s\n", job.GetState())
	// Example response:
	// submitted batch predict job for model "publishers/google/models/gemini-1.5-pro-002"
	// job id: "projects/.../locations/.../batchPredictionJobs/1234567890000000000"
	// job state: JOB_STATE_PENDING

	for {
		time.Sleep(5 * time.Second)

		job, err := client.GetBatchPredictionJob(ctx, &aiplatformpb.GetBatchPredictionJobRequest{
			Name: fullJobId,
		})
		if err != nil {
			return fmt.Errorf("error: couldn't get updated job state: %w", err)
		}

		if job.GetEndTime() != nil {
			fmt.Fprintf(w, "batch predict job finished with state %s\n", job.GetState())
			break
		} else {
			fmt.Fprintf(w, "batch predict job is running... job state is %s\n", job.GetState())
		}
	}

	return nil
}

检索批量输出

批量预测任务完成后,输出存储在您在请求中指定的 Cloud Storage 存储桶或 BigQuery 表中。

后续步骤