Build realtime vector embedding pipeline for AlloyDB with Dataflow

This document shows you how to create an AlloyDB for PostgreSQL Extract, Transform, Load (ETL) pipeline using Dataflow. Google Cloud Dataflow is a fully managed Google Cloud service for developing and running data processing pipelines.

You can use the instructions in the document, which are based on the Vector Embedding Ingestion with Apache Beam and AlloyDB Colab, which uses Python to create the basic_ingestion_pipeline.py ingestion pipeline. Some of the use cases where you can apply the information in this document are semantic search or retrieval augmented generation (RAG).

These instructions describe the following Dataflow pipeline components:

  • Setting up an AlloyDB and Dataflow connection
  • Generating embeddings in AlloyDB for PostgreSQL using the Apache Beam VertexAITextEmbeddings handler and the Vertex AI text embedding model
  • Creating a streaming pipeline in Dataflow

Before you begin

Before you create the Dataflow pipeline using the Colab, complete these prerequisites:

Set up your AlloyDB for PostgreSQL instance and pipeline components

First, configure your pipeline to connect to an AlloyDB for PostgreSQL instance. This configuration includes defining the host, database name, user, and password to construct the complete Java Database Connectivity (JDBC) connection URL. For more information about setting up the connection, see Database Setup.

The Retrieval-Augmented Generation (RAG)-specific Apache Beam modules provide classes for the following tasks:

  • Ingesting data from AlloyDB for PostgreSQL
  • Generating embeddings
  • Writing these vector embeddings back to AlloyDB for PostgreSQL

Import the required classes into your pipeline code before you build the pipeline logic. For more information about pipeline components, see Importing Pipeline Components.

Create sample data

The Vector Embedding Ingestion with Apache Beam and AlloyDB Colab provides sample products_data data for running the pipeline. The pipeline uses this sample data as input, along with the embedding model, to generate embeddings.

For more information, see Create sample data.

Create a table to store embeddings

The pipeline stores the generated embeddings in the default_dataflow_product_embeddings table. For more information about creating the table schema, see Create table with default schema.

Optional: Prepare data for embedding ingestion

Based on your dataset, you can split your data into metadata and text that the embedding model must convert to embeddings. The MLTransform() and VectorDatabaseWriteTransform() classes process input data into a size that the embedding model supports. Include the metadata and format the input data according to the specifications of the embedding model that you are using.

For more information about preparing data, see Map products data to chunks.

Configure the embedding handler to generate embeddings

The VertexAITextEmbeddings() class defines the text embedding model that creates vector embeddings. This embedding model converts the chunked data to embeddings.

For more information, see Configure Embedding Handler.

You can also use a pre-trained model that is created with the Huggingface SentenceTransformers framework to generate vector embeddings. For more information, see Generate embeddings with HuggingFace.

Create an ingestion pipeline

The basic_ingestion_pipeline.py pipeline, provided in the Vector Embedding Ingestion with Apache Beam and AlloyDB Colab, incorporates the configurations from the earlier sections, including AlloyDB for PostgreSQL setup, loading data to AlloyDB for PostgreSQL, optional data chunking, and embedding handler configuration.

The ingestion pipeline does the following:

  • Creates product data tables
  • Converts data to chunks
  • Generates embeddings
  • Writes the converted embeddings to the products_data table in AlloyDB for PostgreSQL

You can run this pipeline using a direct local runner or a cloud-based runner such as Dataflow.

For more information about creating the ingestion pipeline, see Save our Pipeline to a python file.

Run the Dataflow pipeline

You can run a Dataflow pipeline from the command line. Pass credentials, such as your project ID, AlloyDB for PostgreSQL connection details, Cloud Storage bucket location, execution environment details, network information, and the name of the ingestion pipeline (basic_ingestion_pipeline.py).

In the Vector Embedding Ingestion with Apache Beam and AlloyDB Colab, the AlloyDB for PostgreSQL instance and Dataflow jobs run in the same VPC network and subnetwork.

For more information about running a pipeline in Dataflow, see Run Pipeline on Dataflow.

In the Google Cloud console, in the Dataflow dashboard, you can view execution graphs, logs, and metrics while your pipeline executes.

Optional: Run the streaming Dataflow pipeline

For data that is expected to change often, such as similarity searches or recommendation engines, consider creating a streaming pipeline using Dataflow and Pub/Sub.

Instead of processing a batch of data, this pipeline continuously reads incoming messages from a Pub/Sub topic, converts the messages into chunks, generates embeddings using a specified model (like Hugging Face or Vertex AI), and updates the AlloyDB for PostgreSQL table.

For more information, see Streaming Embeddings Updates from Pub/Sub.

Verify vector embeddings in AlloyDB for PostgreSQL

After the pipeline executes, verify that the pipeline wrote the embeddings to your AlloyDB for PostgreSQL database.

For more information, see Verify the Written Embeddings.

What's next