Questo modello crea una pipeline di streaming per trasmettere i record delle modifiche dei dati di Bigtable e scriverli in Vertex AI Vector Search utilizzando Dataflow Runner V2.
Requisiti della pipeline
- L'istanza di origine Bigtable deve esistere.
- La tabella di origine Bigtable deve esistere e deve avere gli modifiche in tempo reale abilitati.
- Deve esistere il profilo dell'applicazione Bigtable.
- Il percorso dell'indice Vector Search deve esistere.
Parametri del modello
Parametro | Descrizione |
---|---|
embeddingColumn |
Il nome completo della colonna in cui sono archiviati gli embedding. Nel formato cf:col. |
embeddingByteSize |
La dimensione in byte di ogni voce nell'array di incorporamenti. Utilizza 4 per float e 8 per double. Il valore predefinito è 4 . |
vectorSearchIndex |
L'indice Vector Search in cui verranno trasmessi in streaming le modifiche, nel formato
"projects/{projectID}/locations/{region}/indexes/{indexID}" (senza spazi iniziali o finali). Ad esempio:
projects/123/locations/us-east1/indexes/456 . |
bigtableChangeStreamAppProfile |
Il profilo dell'applicazione utilizzato per distinguere i carichi di lavoro in Bigtable. |
bigtableReadInstanceId |
L'ID dell'istanza Bigtable che contiene la tabella. |
bigtableReadTableId |
La tabella Bigtable da cui leggere. |
bigtableMetadataTableTableId |
(Facoltativo) ID della tabella dei metadati creata. Se non è impostato, Bigtable genera un ID. |
crowdingTagColumn |
(Facoltativo) Il nome completo della colonna in cui è memorizzato il tag di affollamento, nel formato cf:col . |
allowRestrictsMappings |
(Facoltativo) I nomi di colonna completi separati da virgole delle colonne da utilizzare come restrizioni allow , più
i relativi alias. Ogni nome di colonna deve essere nel formato cf:col->alias . |
denyRestrictsMappings |
(Facoltativo) I nomi di colonna completi separati da virgole delle colonne da utilizzare come restrizioni deny , più
i relativi alias. Ogni nome di colonna deve essere nel formato cf:col->alias . |
intNumericRestrictsMappings |
(Facoltativo) I nomi di colonna completi separati da virgole delle colonne da utilizzare come numeri interi
numeric_restricts , più
i relativi alias. Ogni nome di colonna deve essere nel formato cf:col->alias . |
floatNumericRestrictsMappings |
(Facoltativo) I nomi delle colonne completi e separati da virgole delle colonne da utilizzare come float (4 byte)
numeric_restricts , più
i relativi alias. Ogni nome colonna deve essere nel formato cf:col->alias |
doubleNumericRestrictsMappings |
(Facoltativo) I nomi delle colonne completamente qualificati e separati da virgole da utilizzare come double (8 byte)
numeric_restricts , più
i relativi alias. Ogni nome colonna deve essere nel formato cf:col->alias |
upsertMaxBatchSize |
(Facoltativo) Il numero massimo di upsert da memorizzare nel buffer prima di eseguire l'upsert del batch nell'indice di Vector Search. I batch vengono inviati quando
sono pronti upsertBatchSize record.
Esempio: 10 . |
upsertMaxBufferDuration |
(Facoltativo) Il ritardo massimo prima che un batch di upsert venga inviato a Vector Search. I batch vengono inviati quando sono pronti
upsertBatchSize record. I formati consentiti sono:
Ns per i secondi (ad esempio: 5s), Nm per i
minuti (ad esempio: 12m) e Nh per le ore (ad esempio:
2h). Predefinito: 10s . |
deleteMaxBatchSize |
(Facoltativo) Il numero massimo di eliminazioni da memorizzare nel buffer prima di
eliminare il batch dall'indice Vector Search.
I batch vengono inviati quando sono pronti
deleteBatchSize record.
Ad esempio: 10 . |
deleteMaxBufferDuration |
(Facoltativo) Il ritardo massimo prima che un batch di eliminazioni venga inviato
a Vector Search. I batch vengono inviati quando sono pronti
deleteBatchSize record. I formati consentiti
sono: Ns per i secondi (ad esempio, 5s), Nm per
i minuti (ad esempio, 12m) e Nh per le ore (ad esempio,
2h). Predefinito: 10s . |
dlqDirectory |
(Facoltativo) Il percorso in cui archiviare i record non elaborati con il motivo per cui non è stato possibile elaborarli. Il valore predefinito è una directory nella posizione temporanea del job Dataflow. Il valore predefinito è appropriato per la maggior parte degli scenari. |
bigtableChangeStreamMetadataInstanceId |
(Facoltativo) L'istanza Bigtable da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito è vuoto. |
bigtableChangeStreamMetadataTableTableId |
(Facoltativo) L'ID della tabella dei metadati del connettore delle modifiche in tempo reale di Bigtable da utilizzare. Se non viene fornita, una tabella dei metadati del connettore Bigtable modifiche in tempo reale viene creata automaticamente durante il flusso della pipeline. Il valore predefinito è vuoto. |
bigtableChangeStreamCharset |
(Facoltativo) Nome del set di caratteri dei modifiche in tempo reale Bigtable durante la lettura di valori e qualificatori di colonne. Il valore predefinito è UTF-8. |
bigtableChangeStreamStartTimestamp |
(Facoltativo) La data e l'ora di inizio, inclusa, da utilizzare per la lettura dei modifiche in tempo reale (https://tools.ietf.org/html/rfc3339). Ad esempio, 2022-05-05T07:59:59Z. Il valore predefinito è il timestamp dell'avvio della pipeline. |
bigtableChangeStreamIgnoreColumnFamilies |
(Facoltativo) Un elenco separato da virgole dei nomi delle famiglia di colonne le cui modifiche non verranno acquisite. Il valore predefinito è vuoto. |
bigtableChangeStreamIgnoreColumns |
(Facoltativo) Un elenco separato da virgole di nomi di colonne le cui modifiche non verranno acquisite. Il valore predefinito è vuoto. |
bigtableChangeStreamName |
(Facoltativo) Un nome univoco per la pipeline del client. Questo parametro consente di riprendere l'elaborazione dal punto in cui si è interrotta una pipeline in esecuzione in precedenza. Il valore predefinito è un nome generato automaticamente. Consulta i log dei job Dataflow per il valore utilizzato. |
bigtableChangeStreamResume |
(Facoltativo) Se impostato su true, una nuova pipeline riprende
l'elaborazione dal punto in cui si è interrotta una pipeline
con lo stesso nome in esecuzione in precedenza. Se una pipeline con quel
nome non è mai stata eseguita in passato, la nuova pipeline non viene avviata.
Utilizza il parametro Se impostato su false, viene avviata una nuova pipeline. Se una pipeline
con lo stesso nome di Il valore predefinito è false. |
bigtableReadProjectId |
(Facoltativo) Progetto da cui leggere i dati Bigtable. Il valore predefinito di questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow. |
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello di dataflow, seleziona the Bigtable Change Streams to Vector Search template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
Interfaccia a riga di comando gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Sostituisci quanto segue:
JOB_NAME
: un nome univoco del job a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
EMBEDDING_COLUMN
: la colonna EmbeddingEMBEDDING_BYTE_SIZE
: la dimensione in byte dell'array di incorporamenti. Può essere 4 o 8.VECTOR_SEARCH_INDEX
: il percorso dell'indice Vector SearchBIGTABLE_CHANGE_STREAM_APP_PROFILE
: l'ID del profilo di applicazione BigtableBIGTABLE_READ_INSTANCE_ID
: l'ID istanza Bigtable di origineBIGTABLE_READ_TABLE_ID
: l'ID tabella Bigtable di origine
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
EMBEDDING_COLUMN
: la colonna EmbeddingEMBEDDING_BYTE_SIZE
: la dimensione in byte dell'array di incorporamenti. Può essere 4 o 8.VECTOR_SEARCH_INDEX
: il percorso dell'indice Vector SearchBIGTABLE_CHANGE_STREAM_APP_PROFILE
: l'ID del profilo di applicazione BigtableBIGTABLE_READ_INSTANCE_ID
: l'ID istanza Bigtable di origineBIGTABLE_READ_TABLE_ID
: l'ID tabella Bigtable di origine