Esegui la migrazione dei dati da un database vettoriale ad AlloyDB


Questo tutorial descrive come eseguire la migrazione dei dati da un database vettoriale di terze parti ad AlloyDB per PostgreSQL utilizzando un VectorStore LangChain. Questo tutorial presuppone che i dati nei database vettoriali di terze parti siano stati creati utilizzando un'integrazione di VectorStore di LangChain. Se inserisci informazioni in uno dei seguenti database senza utilizzare LangChain, potresti dover modificare gli script forniti di seguito in modo che corrispondano allo schema dei tuoi dati. Sono supportati i seguenti database di vettori:

Questo tutorial presuppone che tu abbia familiarità con Google Cloud, AlloyDB e la programmazione Python asincrona.

Obiettivi

Questo tutorial illustra come:

  • Estrai i dati da un database vettoriale esistente.
  • Connettiti ad AlloyDB.
  • Inizializza la tabella AlloyDB.
  • Inizializza un oggetto datastore vettoriale.
  • Esegui lo script di migrazione per inserire i dati.

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi Google Cloud utenti potrebbero avere diritto a una prova gratuita.

Al termine delle attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la sezione Pulizia.

Prima di iniziare

Assicurati di avere uno dei seguenti vettori di database di terze parti di LangChain:

Abilita la fatturazione e le API richieste

  1. Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un Google Cloud progetto.

    Vai al selettore dei progetti

  2. Assicurati che la fatturazione sia attivata per il tuo Google Cloud progetto.

  3. Abilita le API Cloud necessarie per creare e connetterti ad AlloyDB per PostgreSQL.

    Abilita le API

    1. Nel passaggio Conferma progetto, fai clic su Avanti per confermare il nome del progetto a cui apporterai modifiche.
    2. Nel passaggio Abilita API, fai clic su Abilita per attivare quanto segue:

      • API AlloyDB
      • API Compute Engine
      • API Service Networking

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per completare le attività di questo tutorial, devi disporre dei seguenti ruoli IAM (Identity and Access Management) che consentono la creazione di tabelle e l'inserimento di dati:

Se vuoi autenticarti al database utilizzando l'autenticazione IAM anziché l'autenticazione integrata in questo tutorial, utilizza il notebook che mostra come utilizzare AlloyDB per PostgreSQL per archiviare gli incorporamenti vettoriali con la classe AlloyDBVectorStore.

Crea un cluster e un utente AlloyDB

  1. Crea un cluster AlloyDB e un'istanza.
    • Abilita l'IP pubblico per eseguire questo tutorial da qualsiasi luogo. Se utilizzi l'IP privato, devi eseguire questo tutorial all'interno della tua VPC.
  2. Crea o seleziona un utente del database AlloyDB.
    • Quando crei l'istanza, viene creato un utente postgres con una password. Questo utente dispone delle autorizzazioni super user.
    • Questo tutorial utilizza l'autenticazione integrata per ridurre eventuali problemi di autenticazione. L'autenticazione IAM è possibile utilizzando AlloyDBEngine.

Recuperare il codice di esempio

  1. Copia l'esempio di codice da GitHub clonando il repository:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Vai alla directory migrations:

    cd langchain-google-alloydb-pg-python/samples/migrations

Estrarre i dati da un database di vettori esistente

  1. Crea un client.

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Recupera tutti i dati dal database.

    Pinecone

    Recupera gli ID vettore dall'indice Pinecone:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    if ids:  # Prevents yielding an empty list.
        yield ids
    
    # Check BOTH pagination and pagination.next
    while results.pagination is not None and results.pagination.get("next") is not None:
        pagination_token = results.pagination.get("next")
        results = pinecone_index.list_paginated(
            prefix="",
            pagination_token=pagination_token,
            namespace=pinecone_namespace,
            limit=pinecone_batch_size,
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        if ids:  # Prevents yielding an empty list.
            yield ids

    Quindi recupera i record per ID dall'indice Pinecone:

    import uuid
    
    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids, namespace=pinecone_namespace)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            # You might need to update this data translation logic according to one or more of your field names
            if pinecone_id_column_name in doc:
                # pinecone_id_column_name stores the unqiue identifier for the content
                ids.append(doc[pinecone_id_column_name])
            else:
                # Generate a uuid if pinecone_id_column_name is missing in source
                ids.append(str(uuid.uuid4()))
            # values is the vector embedding of the content
            embeddings.append(doc["values"])
            # Check if pinecone_content_column_name exists in metadata before accessing
            if pinecone_content_column_name in doc.metadata:
                # pinecone_content_column_name stores the content which was encoded
                contents.append(str(doc.metadata[pinecone_content_column_name]))
                # Remove pinecone_content_column_name after processing
                del doc.metadata[pinecone_content_column_name]
            else:
                # Handle the missing pinecone_content_column_name field appropriately
                contents.append("")
            # metadata is the additional context
            metadatas.append(doc["metadata"])
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids: list[str] = []
    content: list[Any] = []
    embeddings: list[list[float]] = []
    metadatas: list[Any] = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        # You might need to update this data translation logic according to one or more of your field names
        # uuid is the unqiue identifier for the content
        ids.append(str(item.uuid))
        # weaviate_text_key is the content which was encoded
        content.append(item.properties[weaviate_text_key])
        # vector is the vector embedding of the content
        embeddings.append(item.vector["default"])  # type: ignore
        del item.properties[weaviate_text_key]  # type: ignore
        # properties is the additional context
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        # You might need to update this data translation logic according to one or more of your field names
        # documents is the content which was encoded
        # embeddings is the vector embedding of the content
        # metadatas is the additional context
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        # ids is the unqiue identifier for the content
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                # You might need to update this data translation logic according to one or more of your field names
                # id is the unqiue identifier for the content
                ids.append(str(doc.id))
                # page_content is the content which was encoded
                contents.append(doc.payload["page_content"])
                # vector is the vector embedding of the content
                embeddings.append(doc.vector)  # type: ignore
                # metatdata is the additional context
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            # You might need to update this data translation logic according to one or more of your field names
            doc = page[i]
            # pk is the unqiue identifier for the content
            ids.append(doc["pk"])
            # text is the content which was encoded
            content.append(doc["text"])
            # vector is the vector embedding of the content
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            # doc is the additional context
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Inizializza la tabella AlloyDB

  1. Definisci il servizio di embedding.

    L'interfaccia VectorStore richiede un servizio di embedding. Questo flusso di lavoro non genera nuovi embedding, pertanto viene utilizzata la classe FakeEmbeddings per evitare costi.

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Prepara la tabella AlloyDB.

    1. Connettiti ad AlloyDB utilizzando una connessione IP pubblico. Per ulteriori informazioni, consulta Specificare il tipo di indirizzo IP.

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,  # Optionally use IPTypes.PRIVATE
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )
    2. Crea una tabella in cui copiare i dati, se non esiste già.

      Pinecone

      from langchain_google_alloydb_pg import Column
      
      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types if not using the UUID data type
          # id_column=Column("langchain_id", "TEXT"),  # Default is Column("langchain_id", "UUID")
          # overwrite_existing=True,  # Drop the old table and Create a new vector store table
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

Inizializzare un oggetto del datastore vettoriale

Questo codice aggiunge ulteriori metadati di embedding di vettori alla colonna langchain_metadata in formato JSON. Per rendere il filtro più efficiente, organizza questi metadati in colonne separate. Per ulteriori informazioni, vedi Creare un negozio di vektori personalizzato.

  1. Per inizializzare un oggetto dello spazio vettoriale, esegui il seguente comando:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Inserisci i dati nella tabella AlloyDB:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Esegui lo script di migrazione

  1. Configura l'ambiente Python.

  2. Installa le dipendenze di esempio:

    pip install -r requirements.txt
  3. Esegui la migrazione di esempio.

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • PINECONE_API_KEY: la chiave API Pinecone.
    • PINECONE_NAMESPACE: lo spazio dei nomi Pinecone.
    • PINECONE_INDEX_NAME: il nome dell'indice Pinecone.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • WEAVIATE_API_KEY: la chiave API Weaviate.
    • WEAVIATE_CLUSTER_URL: l'URL del cluster Weaviate.
    • WEAVIATE_COLLECTION_NAME: il nome della raccolta Weaviate.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • CHROMADB_PATH: il percorso del database Chroma.
    • CHROMADB_COLLECTION_NAME: il nome della raccolta del database Chroma.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • QDRANT_PATH: il percorso del database Qdrant.
    • QDRANT_COLLECTION_NAME: il nome della raccolta Qdrant.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • MILVUS_URI: l'URI di Milvus.
    • MILVUS_COLLECTION_NAME: il nome della raccolta Milvus.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Una migrazione riuscita stampa log simili al seguente senza errori:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Apri AlloyDB Studio per visualizzare i dati di cui è stata eseguita la migrazione. Per ulteriori informazioni, consulta Gestire i dati utilizzando AlloyDB Studio.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

  1. Nella console Google Cloud, vai alla pagina Cluster.

    Vai a Cluster

  2. Nella colonna Nome risorsa, fai clic sul nome del cluster che hai creato.

  3. Fai clic su Elimina cluster.

  4. In Elimina cluster, inserisci il nome del cluster per confermare che vuoi eliminarlo.

  5. Fai clic su Elimina.

    Se hai creato una connessione privata quando hai creato un cluster, eliminala:

  6. Vai alla pagina Networking della console Google Cloud e fai clic su Elimina rete VPC.

Passaggi successivi