Batch Predict with Gemini using BigQuery data

Perform batch text prediction with Gemini using BigQuery data source as input.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

Go

Before trying this sample, follow the Go setup instructions in the Vertex AI quickstart using client libraries. For more information, see the Vertex AI Go API reference documentation.

To authenticate to Vertex AI, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import (
	"context"
	"fmt"
	"io"
	"time"

	aiplatform "cloud.google.com/go/aiplatform/apiv1"
	aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"

	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/structpb"
)

// batchPredictBQ submits a batch prediction job using BigQuery data source as its input
func batchPredictBQ(w io.Writer, projectID, location string, inputURI string, outputURI string) error {
	// location  := "us-central1"
	// inputURI  := "bq://storage-samples.generative_ai.batch_requests_for_multimodal_input"
	// outputURI := "bq://<cloud-project-name>.<dataset-name>.<table-name>"
	modelName := "gemini-1.5-pro-002"
	jobName := "batch-predict-bq-test-001"

	ctx := context.Background()
	apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location)
	client, err := aiplatform.NewJobClient(ctx, option.WithEndpoint(apiEndpoint))
	if err != nil {
		return fmt.Errorf("unable to create aiplatform client: %w", err)
	}
	defer client.Close()

	modelParameters, err := structpb.NewValue(map[string]interface{}{
		"temperature":     0.2,
		"maxOutputTokens": 200,
	})
	if err != nil {
		return fmt.Errorf("unable to convert model parameters to protobuf value: %w", err)
	}

	req := &aiplatformpb.CreateBatchPredictionJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location),
		BatchPredictionJob: &aiplatformpb.BatchPredictionJob{
			DisplayName:     jobName,
			Model:           fmt.Sprintf("publishers/google/models/%s", modelName),
			ModelParameters: modelParameters,
			// Check the API reference for `BatchPredictionJob` for supported input and output formats:
			// https://cloud.google.com/vertex-ai/docs/reference/rpc/google.cloud.aiplatform.v1#google.cloud.aiplatform.v1.BatchPredictionJob
			InputConfig: &aiplatformpb.BatchPredictionJob_InputConfig{
				Source: &aiplatformpb.BatchPredictionJob_InputConfig_BigquerySource{
					BigquerySource: &aiplatformpb.BigQuerySource{
						InputUri: inputURI,
					},
				},
				InstancesFormat: "bigquery",
			},

			OutputConfig: &aiplatformpb.BatchPredictionJob_OutputConfig{
				Destination: &aiplatformpb.BatchPredictionJob_OutputConfig_BigqueryDestination{
					BigqueryDestination: &aiplatformpb.BigQueryDestination{
						OutputUri: outputURI,
					},
				},
				PredictionsFormat: "bigquery",
			},
		},
	}

	job, err := client.CreateBatchPredictionJob(ctx, req)
	if err != nil {
		return err
	}
	fullJobId := job.GetName()
	fmt.Fprintf(w, "submitted batch predict job for model %q\n", job.GetModel())
	fmt.Fprintf(w, "job id: %q\n", fullJobId)
	fmt.Fprintf(w, "job state: %s\n", job.GetState())
	// Example response:
	// submitted batch predict job for model "publishers/google/models/gemini-1.5-pro-002"
	// job id: "projects/.../locations/.../batchPredictionJobs/1234567890000000000"
	// job state: JOB_STATE_PENDING

	for {
		time.Sleep(5 * time.Second)

		job, err := client.GetBatchPredictionJob(ctx, &aiplatformpb.GetBatchPredictionJobRequest{
			Name: fullJobId,
		})
		if err != nil {
			return fmt.Errorf("error: couldn't get updated job state: %w", err)
		}

		if job.GetEndTime() != nil {
			fmt.Fprintf(w, "batch predict job finished with state %s\n", job.GetState())
			break
		} else {
			fmt.Fprintf(w, "batch predict job is running... job state is %s\n", job.GetState())
		}
	}

	return nil
}

Java

Before trying this sample, follow the Java setup instructions in the Vertex AI quickstart using client libraries. For more information, see the Vertex AI Java API reference documentation.

To authenticate to Vertex AI, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.cloud.aiplatform.v1.BatchPredictionJob;
import com.google.cloud.aiplatform.v1.BigQueryDestination;
import com.google.cloud.aiplatform.v1.BigQuerySource;
import com.google.cloud.aiplatform.v1.JobServiceClient;
import com.google.cloud.aiplatform.v1.JobServiceSettings;
import com.google.cloud.aiplatform.v1.LocationName;
import java.io.IOException;

public class CreateBatchPredictionGeminiBigqueryJobSample {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Update these variables before running the sample.
    String project = "PROJECT_ID";
    String bigqueryDestinationOutputUri = "bq://PROJECT_ID.MY_DATASET.MY_TABLE";

    createBatchPredictionGeminiBigqueryJobSample(project, bigqueryDestinationOutputUri);
  }

  // Create a batch prediction job using BigQuery input and output datasets.
  public static BatchPredictionJob createBatchPredictionGeminiBigqueryJobSample(
      String project, String bigqueryDestinationOutputUri) throws IOException {
    String location = "us-central1";
    JobServiceSettings settings =
        JobServiceSettings.newBuilder()
            .setEndpoint(String.format("%s-aiplatform.googleapis.com:443", location))
            .build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (JobServiceClient client = JobServiceClient.create(settings)) {
      BigQuerySource bigquerySource =
          BigQuerySource.newBuilder()
              .setInputUri("bq://storage-samples.generative_ai.batch_requests_for_multimodal_input")
              .build();
      BatchPredictionJob.InputConfig inputConfig =
          BatchPredictionJob.InputConfig.newBuilder()
              .setInstancesFormat("bigquery")
              .setBigquerySource(bigquerySource)
              .build();
      BigQueryDestination bigqueryDestination =
          BigQueryDestination.newBuilder().setOutputUri(bigqueryDestinationOutputUri).build();
      BatchPredictionJob.OutputConfig outputConfig =
          BatchPredictionJob.OutputConfig.newBuilder()
              .setPredictionsFormat("bigquery")
              .setBigqueryDestination(bigqueryDestination)
              .build();
      String modelName =
          String.format(
              "projects/%s/locations/%s/publishers/google/models/%s",
              project, location, "gemini-1.5-flash-002");

      BatchPredictionJob batchPredictionJob =
          BatchPredictionJob.newBuilder()
              .setDisplayName("my-display-name")
              .setModel(modelName) // Add model parameters per request in the input BigQuery table.
              .setInputConfig(inputConfig)
              .setOutputConfig(outputConfig)
              .build();

      LocationName parent = LocationName.of(project, location);
      BatchPredictionJob response = client.createBatchPredictionJob(parent, batchPredictionJob);
      System.out.format("\tName: %s\n", response.getName());
      // Example response:
      //   Name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
      return response;
    }
  }
}

Node.js

Before trying this sample, follow the Node.js setup instructions in the Vertex AI quickstart using client libraries. For more information, see the Vertex AI Node.js API reference documentation.

To authenticate to Vertex AI, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

// Import the aiplatform library
const aiplatformLib = require('@google-cloud/aiplatform');
const aiplatform = aiplatformLib.protos.google.cloud.aiplatform.v1;

/**
 * TODO(developer):  Uncomment/update these variables before running the sample.
 */
// projectId = 'YOUR_PROJECT_ID';
// URI of the output BigQuery table.
// E.g. "bq://[PROJECT].[DATASET].[TABLE]"
// outputUri = 'bq://projectid.dataset.table';

// URI of the multimodal input BigQuery table.
// E.g. "bq://[PROJECT].[DATASET].[TABLE]"
const inputUri =
  'bq://storage-samples.generative_ai.batch_requests_for_multimodal_input';
const location = 'us-central1';
const parent = `projects/${projectId}/locations/${location}`;
const modelName = `${parent}/publishers/google/models/gemini-1.5-flash-002`;

// Specify the location of the api endpoint.
const clientOptions = {
  apiEndpoint: `${location}-aiplatform.googleapis.com`,
};

// Instantiate the client.
const jobServiceClient = new aiplatformLib.JobServiceClient(clientOptions);

// Create a Gemini batch prediction job using BigQuery input and output datasets.
async function create_batch_prediction_gemini_bq() {
  const bqSource = new aiplatform.BigQuerySource({
    inputUri: inputUri,
  });

  const inputConfig = new aiplatform.BatchPredictionJob.InputConfig({
    bigquerySource: bqSource,
    instancesFormat: 'bigquery',
  });

  const bqDestination = new aiplatform.BigQueryDestination({
    outputUri: outputUri,
  });

  const outputConfig = new aiplatform.BatchPredictionJob.OutputConfig({
    bigqueryDestination: bqDestination,
    predictionsFormat: 'bigquery',
  });

  const batchPredictionJob = new aiplatform.BatchPredictionJob({
    displayName: 'Batch predict with Gemini - BigQuery',
    model: modelName, // Add model parameters per request in the input BigQuery table.
    inputConfig: inputConfig,
    outputConfig: outputConfig,
  });

  const request = {
    parent: parent,
    batchPredictionJob,
  };

  // Create batch prediction job request
  const [response] = await jobServiceClient.createBatchPredictionJob(request);
  console.log('Response name: ', response.name);
  // Example response:
  // Response name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
}

await create_batch_prediction_gemini_bq();

Python

Before trying this sample, follow the Python setup instructions in the Vertex AI quickstart using client libraries. For more information, see the Vertex AI Python API reference documentation.

To authenticate to Vertex AI, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import time
import vertexai

from vertexai.batch_prediction import BatchPredictionJob

# TODO(developer): Update and un-comment below line
# PROJECT_ID = "your-project-id"

# Initialize vertexai
vertexai.init(project=PROJECT_ID, location="us-central1")

input_uri = "bq://storage-samples.generative_ai.batch_requests_for_multimodal_input"

# Submit a batch prediction job with Gemini model
batch_prediction_job = BatchPredictionJob.submit(
    source_model="gemini-1.5-flash-002",
    input_dataset=input_uri,
    output_uri_prefix=output_uri,
)

# Check job status
print(f"Job resource name: {batch_prediction_job.resource_name}")
print(f"Model resource name with the job: {batch_prediction_job.model_name}")
print(f"Job state: {batch_prediction_job.state.name}")

# Refresh the job until complete
while not batch_prediction_job.has_ended:
    time.sleep(5)
    batch_prediction_job.refresh()

# Check if the job succeeds
if batch_prediction_job.has_succeeded:
    print("Job succeeded!")
else:
    print(f"Job failed: {batch_prediction_job.error}")

# Check the location of the output
print(f"Job output location: {batch_prediction_job.output_location}")

# Example response:
#  Job output location: bq://Project-ID/gen-ai-batch-prediction/predictions-model-year-month-day-hour:minute:second.12345

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.