Migrar dados de um banco de dados de vetores para o AlloyDB


Este tutorial descreve como migrar dados de um banco de dados de vetor de terceiros para o AlloyDB para PostgreSQL usando uma VectorStore do LangChain. Neste tutorial, presumimos que os dados nos bancos de dados de vetores de terceiros foram criados usando uma integração do LangChain VectorStore. Se você colocar informações em um dos bancos de dados abaixo sem usar o LangChain, talvez seja necessário editar os scripts fornecidos abaixo para corresponder ao esquema dos seus dados. Os seguintes bancos de dados de vetores são compatíveis:

Neste tutorial, pressupomos que você já conhece o Google Cloud, o AlloyDB e a programação assíncrona do Python.

Objetivos

Este tutorial mostra como fazer o seguinte:

  • Extrair dados de um banco de dados de vetores.
  • Conecte-se ao AlloyDB.
  • Inicialize a tabela do AlloyDB.
  • Inicialize um objeto de armazenamento de vetores.
  • Execute o script de migração para inserir os dados.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

Verifique se você tem uma das seguintes lojas de vetores de banco de dados de terceiros do LangChain:

Ativar o faturamento e as APIs necessárias

  1. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projetoGoogle Cloud .

    Acessar o seletor de projetos

  2. Verifique se o faturamento foi ativado para o projeto Google Cloud .

  3. Ative as APIs do Cloud necessárias para criar e se conectar ao AlloyDB para PostgreSQL.

    Ativar as APIs

    1. Na etapa Confirmar projeto, clique em Próxima para confirmar o nome do projeto em que você vai fazer mudanças.
    2. Na etapa Ativar APIs, clique em Ativar para ativar o seguinte:

      • API AlloyDB
      • API Compute Engine
      • API Service Networking

Funções exigidas

Para receber as permissões necessárias para concluir as tarefas neste tutorial, tenha os seguintes papéis de Identity and Access Management (IAM) que permitem a criação de tabelas e a inserção de dados:

  • Proprietário (roles/owner) ou editor (roles/editor)
  • Se o usuário não for proprietário ou editor, os seguintes papéis do IAM e privilégios do PostgreSQL serão necessários:

Se você quiser fazer a autenticação no seu banco de dados usando a autenticação do IAM em vez da autenticação integrada neste tutorial, use o notebook que mostra como usar o AlloyDB para PostgreSQL para armazenar embeddings de vetores com a classe AlloyDBVectorStore.

Criar um cluster e um usuário do AlloyDB

  1. Crie um cluster do AlloyDB e uma instância.
    • Ative o IP público para executar este tutorial de qualquer lugar. Se você estiver usando o IP particular, execute este tutorial na VPC.
  2. Crie ou selecione um usuário do banco de dados do AlloyDB.
    • Quando você cria a instância, um usuário postgres é criado com uma senha. Esse usuário tem permissões de superusuário.
    • Este tutorial usa a autenticação integrada para reduzir qualquer fricção de autenticação. A autenticação do IAM é possível usando o AlloyDBEngine.

Recuperar a amostra de código

  1. Clone o repositório para copiar o exemplo de código do GitHub:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Navegue até o diretório migrations:

    cd langchain-google-alloydb-pg-python/samples/migrations

Extrair dados de um banco de dados de vetores

  1. Criar um cliente.

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Receba todos os dados do banco de dados.

    Pinecone

    Extraia IDs de vetores do índice do Pinecone:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    if ids:  # Prevents yielding an empty list.
        yield ids
    
    # Check BOTH pagination and pagination.next
    while results.pagination is not None and results.pagination.get("next") is not None:
        pagination_token = results.pagination.get("next")
        results = pinecone_index.list_paginated(
            prefix="",
            pagination_token=pagination_token,
            namespace=pinecone_namespace,
            limit=pinecone_batch_size,
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        if ids:  # Prevents yielding an empty list.
            yield ids

    Em seguida, busque registros por ID no índice do Pinecone:

    import uuid
    
    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids, namespace=pinecone_namespace)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            # You might need to update this data translation logic according to one or more of your field names
            if pinecone_id_column_name in doc:
                # pinecone_id_column_name stores the unqiue identifier for the content
                ids.append(doc[pinecone_id_column_name])
            else:
                # Generate a uuid if pinecone_id_column_name is missing in source
                ids.append(str(uuid.uuid4()))
            # values is the vector embedding of the content
            embeddings.append(doc["values"])
            # Check if pinecone_content_column_name exists in metadata before accessing
            if pinecone_content_column_name in doc.metadata:
                # pinecone_content_column_name stores the content which was encoded
                contents.append(str(doc.metadata[pinecone_content_column_name]))
                # Remove pinecone_content_column_name after processing
                del doc.metadata[pinecone_content_column_name]
            else:
                # Handle the missing pinecone_content_column_name field appropriately
                contents.append("")
            # metadata is the additional context
            metadatas.append(doc["metadata"])
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids: list[str] = []
    content: list[Any] = []
    embeddings: list[list[float]] = []
    metadatas: list[Any] = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        # You might need to update this data translation logic according to one or more of your field names
        # uuid is the unqiue identifier for the content
        ids.append(str(item.uuid))
        # weaviate_text_key is the content which was encoded
        content.append(item.properties[weaviate_text_key])
        # vector is the vector embedding of the content
        embeddings.append(item.vector["default"])  # type: ignore
        del item.properties[weaviate_text_key]  # type: ignore
        # properties is the additional context
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        # You might need to update this data translation logic according to one or more of your field names
        # documents is the content which was encoded
        # embeddings is the vector embedding of the content
        # metadatas is the additional context
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        # ids is the unqiue identifier for the content
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                # You might need to update this data translation logic according to one or more of your field names
                # id is the unqiue identifier for the content
                ids.append(str(doc.id))
                # page_content is the content which was encoded
                contents.append(doc.payload["page_content"])
                # vector is the vector embedding of the content
                embeddings.append(doc.vector)  # type: ignore
                # metatdata is the additional context
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            # You might need to update this data translation logic according to one or more of your field names
            doc = page[i]
            # pk is the unqiue identifier for the content
            ids.append(doc["pk"])
            # text is the content which was encoded
            content.append(doc["text"])
            # vector is the vector embedding of the content
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            # doc is the additional context
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Inicializar a tabela do AlloyDB

  1. Defina o serviço de incorporação.

    A interface VectorStore requer um serviço de incorporação. Esse fluxo de trabalho não gera novas embeddings. Portanto, a classe FakeEmbeddings é usada para evitar custos.

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Prepare a tabela do AlloyDB.

    1. Conecte-se ao AlloyDB usando uma conexão de IP público. Para mais informações, consulte Como especificar o tipo de endereço IP.

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,  # Optionally use IPTypes.PRIVATE
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )
    2. Crie uma tabela para copiar dados, caso ela ainda não exista.

      Pinecone

      from langchain_google_alloydb_pg import Column
      
      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types if not using the UUID data type
          # id_column=Column("langchain_id", "TEXT"),  # Default is Column("langchain_id", "UUID")
          # overwrite_existing=True,  # Drop the old table and Create a new vector store table
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

Inicializar um objeto de armazenamento de vetores

Esse código adiciona metadados de incorporação de vetores adicionais à coluna langchain_metadata em um formato JSON. Para tornar a filtragem mais eficiente, organize esses metadados em colunas separadas. Para mais informações, consulte Criar uma loja de vetores personalizada.

  1. Para inicializar um objeto de armazenamento de vetores, execute o seguinte comando:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Insira dados na tabela do AlloyDB:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Executar o script de migração

  1. Configure o ambiente Python.

  2. Instale as dependências de amostra:

    pip install -r requirements.txt
  3. Execute a migração de exemplo.

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • PINECONE_API_KEY: a chave da API Pinecone.
    • PINECONE_NAMESPACE: o namespace da Pinecone.
    • PINECONE_INDEX_NAME: o nome do índice do Pinecone.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • WEAVIATE_API_KEY: a chave da API Weaviate.
    • WEAVIATE_CLUSTER_URL: o URL do cluster do Weaviate.
    • WEAVIATE_COLLECTION_NAME: o nome da coletânea do Weaviate.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • CHROMADB_PATH: o caminho do banco de dados do Chroma.
    • CHROMADB_COLLECTION_NAME: o nome da coleção de banco de dados do Chroma.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • QDRANT_PATH: o caminho do banco de dados do Qdrant.
    • QDRANT_COLLECTION_NAME: o nome da coleção do Qdrant.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • MILVUS_URI: o URI do Milvus.
    • MILVUS_COLLECTION_NAME: o nome da coleção Milvus.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Uma migração bem-sucedida imprime registros semelhantes a estes sem erros:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Abra o AlloyDB Studio para conferir os dados migrados. Para mais informações, consulte Gerenciar dados com o AlloyDB Studio.

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

  1. No console do Google Cloud, acesse a página Clusters.

    Acessar Clusters

  2. Na coluna Nome do recurso, clique no nome do cluster que você criou.

  3. Clique em Excluir cluster.

  4. Em Excluir cluster, insira o nome do cluster para confirmar que você quer excluir o cluster.

  5. Clique em Excluir.

    Se você criou uma conexão particular ao criar um cluster, exclua a conexão particular:

  6. Acesse a página de rede do console do Google Cloud e clique em Excluir rede VPC.

A seguir