This document shows you how to create an AlloyDB for PostgreSQL Extract, Transform, Load (ETL) pipeline using Dataflow. Google Cloud Dataflow is a fully managed Google Cloud service for developing and running data processing pipelines.
You can use the instructions in the document, which are based on the Vector Embedding Ingestion with Apache Beam and AlloyDB Colab, which uses Python to create the basic_ingestion_pipeline.py
ingestion pipeline. Some of the use cases where you can apply the information in this document are semantic search or retrieval augmented generation (RAG).
These instructions describe the following Dataflow pipeline components:
- Setting up an AlloyDB and Dataflow connection
- Generating embeddings in AlloyDB for PostgreSQL using the Apache Beam
VertexAITextEmbeddings
handler and the Vertex AI text embedding model - Creating a streaming pipeline in Dataflow
Before you begin
Before you create the Dataflow pipeline using the Colab, complete these prerequisites:
- Configure your environment to create a Dataflow pipeline.
Enable the AlloyDB for PostgreSQL and other required APIs:
gcloud services enable alloydb.googleapis.com cloudresourcemanager.googleapis.com \ servicenetworking.googleapis.com
Create an AlloyDB for PostgreSQL cluster and primary instance.
Install the AlloyDB for PostgreSQL vector extension in your database.
Grant the AlloyDB Admin (roles/alloydb.admin) role to the Dataflow user account.
Set up your AlloyDB for PostgreSQL instance and pipeline components
First, configure your pipeline to connect to an AlloyDB for PostgreSQL instance. This configuration includes defining the host, database name, user, and password to construct the complete Java Database Connectivity (JDBC) connection URL. For more information about setting up the connection, see Database Setup.
The Retrieval-Augmented Generation (RAG)-specific Apache Beam modules provide classes for the following tasks:
- Ingesting data from AlloyDB for PostgreSQL
- Generating embeddings
- Writing these vector embeddings back to AlloyDB for PostgreSQL
Import the required classes into your pipeline code before you build the pipeline logic. For more information about pipeline components, see Importing Pipeline Components.
Create sample data
The Vector Embedding Ingestion with Apache Beam and AlloyDB Colab provides sample products_data
data for running the pipeline. The pipeline uses this sample data as input, along with the embedding model, to generate embeddings.
For more information, see Create sample data.
Create a table to store embeddings
The pipeline stores the generated embeddings in the default_dataflow_product_embeddings
table. For more information about creating the table schema, see Create table with default schema.
Optional: Prepare data for embedding ingestion
Based on your dataset, you can split your data into metadata and text that the embedding model must convert to embeddings. The MLTransform()
and VectorDatabaseWriteTransform()
classes process input data into a size that the embedding model supports. Include the metadata and format the input data according to the specifications of the embedding model that you are using.
For more information about preparing data, see Map products data to chunks.
Configure the embedding handler to generate embeddings
The VertexAITextEmbeddings()
class defines the text embedding model that creates vector embeddings. This embedding model converts the chunked data to embeddings.
For more information, see Configure Embedding Handler.
You can also use a pre-trained model that is created with the Huggingface SentenceTransformers framework to generate vector embeddings. For more information, see Generate embeddings with HuggingFace.
Create an ingestion pipeline
The basic_ingestion_pipeline.py
pipeline, provided in the Vector Embedding Ingestion with Apache Beam and AlloyDB Colab, incorporates the configurations from the earlier sections, including AlloyDB for PostgreSQL setup, loading data to AlloyDB for PostgreSQL, optional data chunking, and embedding handler configuration.
The ingestion pipeline does the following:
- Creates product data tables
- Converts data to chunks
- Generates embeddings
- Writes the converted embeddings to the
products_data
table in AlloyDB for PostgreSQL
You can run this pipeline using a direct local runner or a cloud-based runner such as Dataflow.
For more information about creating the ingestion pipeline, see Save our Pipeline to a python file.
Run the Dataflow pipeline
You can run a Dataflow pipeline from the command line. Pass credentials, such as your project ID, AlloyDB for PostgreSQL connection details, Cloud Storage bucket location, execution environment details, network information, and the name of the ingestion pipeline (basic_ingestion_pipeline.py
).
In the Vector Embedding Ingestion with Apache Beam and AlloyDB Colab, the AlloyDB for PostgreSQL instance and Dataflow jobs run in the same VPC network and subnetwork.
For more information about running a pipeline in Dataflow, see Run Pipeline on Dataflow.
In the Google Cloud console, in the Dataflow dashboard, you can view execution graphs, logs, and metrics while your pipeline executes.
Optional: Run the streaming Dataflow pipeline
For data that is expected to change often, such as similarity searches or recommendation engines, consider creating a streaming pipeline using Dataflow and Pub/Sub.
Instead of processing a batch of data, this pipeline continuously reads incoming messages from a Pub/Sub topic, converts the messages into chunks, generates embeddings using a specified model (like Hugging Face or Vertex AI), and updates the AlloyDB for PostgreSQL table.
For more information, see Streaming Embeddings Updates from Pub/Sub.
Verify vector embeddings in AlloyDB for PostgreSQL
After the pipeline executes, verify that the pipeline wrote the embeddings to your AlloyDB for PostgreSQL database.
For more information, see Verify the Written Embeddings.
What's next
- Learn how to perform vector embedding ingestion with Apache Beam, Dataflow, and AlloyDB for PostgreSQL.