Create a training pipeline for custom job

Creates a training pipeline for custom job using the create_training_pipeline method.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

Java

Before trying this sample, follow the Java setup instructions in the Vertex AI quickstart using client libraries. For more information, see the Vertex AI Java API reference documentation.

To authenticate to Vertex AI, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.cloud.aiplatform.v1.LocationName;
import com.google.cloud.aiplatform.v1.Model;
import com.google.cloud.aiplatform.v1.ModelContainerSpec;
import com.google.cloud.aiplatform.v1.PipelineServiceClient;
import com.google.cloud.aiplatform.v1.PipelineServiceSettings;
import com.google.cloud.aiplatform.v1.TrainingPipeline;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.protobuf.Value;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;

public class CreateTrainingPipelineCustomJobSample {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "PROJECT";
    String displayName = "DISPLAY_NAME";
    String modelDisplayName = "MODEL_DISPLAY_NAME";
    String containerImageUri = "CONTAINER_IMAGE_URI";
    String baseOutputDirectoryPrefix = "BASE_OUTPUT_DIRECTORY_PREFIX";
    createTrainingPipelineCustomJobSample(
        project, displayName, modelDisplayName, containerImageUri, baseOutputDirectoryPrefix);
  }

  static void createTrainingPipelineCustomJobSample(
      String project,
      String displayName,
      String modelDisplayName,
      String containerImageUri,
      String baseOutputDirectoryPrefix)
      throws IOException {
    PipelineServiceSettings settings =
        PipelineServiceSettings.newBuilder()
            .setEndpoint("us-central1-aiplatform.googleapis.com:443")
            .build();
    String location = "us-central1";

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (PipelineServiceClient client = PipelineServiceClient.create(settings)) {
      JsonObject jsonMachineSpec = new JsonObject();
      jsonMachineSpec.addProperty("machineType", "n1-standard-4");

      // A working docker image can be found at
      // gs://cloud-samples-data/ai-platform/mnist_tfrecord/custom_job
      // This sample image accepts a set of arguments including model_dir.
      JsonObject jsonContainerSpec = new JsonObject();
      jsonContainerSpec.addProperty("imageUri", containerImageUri);
      JsonArray jsonArgs = new JsonArray();
      jsonArgs.add("--model_dir=$(AIP_MODEL_DIR)");
      jsonContainerSpec.add("args", jsonArgs);

      JsonObject jsonJsonWorkerPoolSpec0 = new JsonObject();
      jsonJsonWorkerPoolSpec0.addProperty("replicaCount", 1);
      jsonJsonWorkerPoolSpec0.add("machineSpec", jsonMachineSpec);
      jsonJsonWorkerPoolSpec0.add("containerSpec", jsonContainerSpec);

      JsonArray jsonWorkerPoolSpecs = new JsonArray();
      jsonWorkerPoolSpecs.add(jsonJsonWorkerPoolSpec0);

      JsonObject jsonBaseOutputDirectory = new JsonObject();
      // The GCS location for outputs must be accessible by the project's AI Platform
      // service account.
      jsonBaseOutputDirectory.addProperty("output_uri_prefix", baseOutputDirectoryPrefix);

      JsonObject jsonTrainingTaskInputs = new JsonObject();
      jsonTrainingTaskInputs.add("workerPoolSpecs", jsonWorkerPoolSpecs);
      jsonTrainingTaskInputs.add("baseOutputDirectory", jsonBaseOutputDirectory);

      Value.Builder trainingTaskInputsBuilder = Value.newBuilder();
      JsonFormat.parser().merge(jsonTrainingTaskInputs.toString(), trainingTaskInputsBuilder);
      Value trainingTaskInputs = trainingTaskInputsBuilder.build();
      String trainingTaskDefinition =
          "gs://google-cloud-aiplatform/schema/trainingjob/definition/custom_task_1.0.0.yaml";
      String imageUri = "gcr.io/cloud-aiplatform/prediction/tf-cpu.1-15:latest";
      ModelContainerSpec containerSpec =
          ModelContainerSpec.newBuilder().setImageUri(imageUri).build();
      Model modelToUpload =
          Model.newBuilder()
              .setDisplayName(modelDisplayName)
              .setContainerSpec(containerSpec)
              .build();
      TrainingPipeline trainingPipeline =
          TrainingPipeline.newBuilder()
              .setDisplayName(displayName)
              .setTrainingTaskDefinition(trainingTaskDefinition)
              .setTrainingTaskInputs(trainingTaskInputs)
              .setModelToUpload(modelToUpload)
              .build();
      LocationName parent = LocationName.of(project, location);
      TrainingPipeline response = client.createTrainingPipeline(parent, trainingPipeline);
      System.out.format("response: %s\n", response);
      System.out.format("Name: %s\n", response.getName());
    }
  }
}

Python

Before trying this sample, follow the Python setup instructions in the Vertex AI quickstart using client libraries. For more information, see the Vertex AI Python API reference documentation.

To authenticate to Vertex AI, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

from google.cloud import aiplatform
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value


def create_training_pipeline_custom_job_sample(
    project: str,
    display_name: str,
    model_display_name: str,
    container_image_uri: str,
    base_output_directory_prefix: str,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.PipelineServiceClient(client_options=client_options)

    training_task_inputs_dict = {
        "workerPoolSpecs": [
            {
                "replicaCount": 1,
                "machineSpec": {"machineType": "n1-standard-4"},
                "containerSpec": {
                    # A working docker image can be found at gs://cloud-samples-data/ai-platform/mnist_tfrecord/custom_job
                    "imageUri": container_image_uri,
                    "args": [
                        # AIP_MODEL_DIR is set by the service according to baseOutputDirectory.
                        "--model_dir=$(AIP_MODEL_DIR)",
                    ],
                },
            }
        ],
        "baseOutputDirectory": {
            # The GCS location for outputs must be accessible by the project's AI Platform service account.
            "output_uri_prefix": base_output_directory_prefix
        },
    }
    training_task_inputs = json_format.ParseDict(training_task_inputs_dict, Value())

    training_task_definition = "gs://google-cloud-aiplatform/schema/trainingjob/definition/custom_task_1.0.0.yaml"
    image_uri = "gcr.io/cloud-aiplatform/prediction/tf-cpu.1-15:latest"

    training_pipeline = {
        "display_name": display_name,
        "training_task_definition": training_task_definition,
        "training_task_inputs": training_task_inputs,
        "model_to_upload": {
            "display_name": model_display_name,
            "container_spec": {"image_uri": image_uri},
        },
    }
    parent = f"projects/{project}/locations/{location}"
    response = client.create_training_pipeline(
        parent=parent, training_pipeline=training_pipeline
    )
    print("response:", response)

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.