Preprocess data with MLTransform

This page explains why and how to use the MLTransform feature to prepare your data for training machine learning (ML) models. By combining multiple data processing transforms in one class, MLTransform streamlines the process of applying Apache Beam ML data processing operations to your workflow.

For information about using MLTransform for embedding generation tasks, see Generate embeddings with MLTransform.

Diagram of the Dataflow ML workflow with the data processing step highlighted.

Figure 1. The complete Dataflow ML workflow. Use MLTransform in the preprocessing step of the workflow.

Benefits

The MLTransform class provides the following benefits:

  • Transform your data without writing complex code or managing underlying libraries.
  • Efficiently chain multiple types of processing operations with one interface.
  • Generate embeddings that you can use to push data into vector databases or to run inference.

    For more information about embedding generation, see Generate embeddings with MLTransform.

Support and limitations

The MLTransform class has the following limitations:

  • Available for pipelines that use the Apache Beam Python SDK versions 2.53.0 and later.
  • Pipelines must use default windows.

Data processing transforms that use TFT:

  • Support Python 3.9, 3.10, 3.11.
  • Support batch pipelines.

Use cases

The example notebooks demonstrate how to use MLTransform for embeddings-specific use cases.

I want to compute a vocabulary from a dataset
Compute a unique vocabulary from a dataset and then map each word or token to a distinct integer index. Use this transform to change textual data into numerical representations for machine learning tasks.
I want to scale my data to train my ML model
Scale your data so that you can use it to train your ML model. The Apache Beam MLTransform class includes multiple data scaling transforms.

For a full list of available transforms, see Transforms in the Apache Beam documentation.

Use MLTransform

To use the MLTransform class to preprocess data, include the following code in your pipeline:

  import apache_beam as beam
  from apache_beam.ml.transforms.base import MLTransform
  from apache_beam.ml.transforms.tft import TRANSFORM_NAME
  import tempfile

  data = [
      {
          DATA
      },
  ]

  artifact_location = gs://BUCKET_NAME
  TRANSFORM_FUNCTION_NAME = TRANSFORM_NAME(columns=['x'])

  with beam.Pipeline() as p:
    transformed_data = (
        p
        | beam.Create(data)
        | MLTransform(write_artifact_location=artifact_location).with_transform(
            TRANSFORM_FUNCTION_NAME)
        | beam.Map(print))

Replace the following values:

  • TRANSFORM_NAME: the name of the transform to use
  • BCUKET_NAME: the name of your Cloud Storage bucket
  • DATA: the input data to transform
  • TRANSFORM_FUNCTION_NAME: the name that you assign to your transform function in your code

What's next