此模板会创建一个流处理流水线,以使用 Dataflow Runner V2 流式传输 Bigtable 数据更改记录并将其写入 Vertex AI Vector Search。
流水线要求
- Bigtable 源实例必须存在。
- Bigtable 源表必须存在,并且该表必须启用变更数据流。
- Bigtable 应用配置文件必须存在。
- Vector Search 索引路径必须存在。
模板参数
必需参数
- embeddingColumn:在其中存储嵌入的完全限定列名。格式为 cf:col。
- embeddingByteSize:嵌入数组中每个条目的字节大小。对于浮点数,请使用 4;对于双精度数,请使用 8。默认值为 4。
- vectorSearchIndex:将流式传输更改的向量搜索索引,格式为“projects/{projectID}/locations/{region}/indexes/{indexID}”(不含开头或结尾空格),例如
projects/123/locations/us-east1/indexes/456
。 - bigtableChangeStreamAppProfile:Bigtable 应用配置文件 ID。应用配置文件必须使用单集群路由并允许单行事务。
- bigtableReadInstanceId:源 Bigtable 实例 ID。
- bigtableReadTableId:源 Bigtable 表 ID。
可选参数
- bigtableMetadataTableTableId:用于创建元数据表的表 ID。
- crowdingTagColumn:在其中存储拥挤标记的完全限定列名。格式为 cf:col。
- allowRestrictsMappings:应作为
allow
限制使用的列的以逗号分隔完全限定列名及其别名。格式为 cf:col->alias。 - denyRestrictsMappings:应作为
deny
限制使用的列的以逗号分隔的完全限定列名及其别名。格式为 cf:col->alias。 - intNumericRestrictsMappings:应作为整数
numeric_restricts
使用的列的以逗号分隔完全限定列名及其别名。格式为 cf:col->alias。 - floatNumericRestrictsMappings:应作为浮点数(4 字节)
numeric_restricts
使用的列的以逗号分隔完全限定列名及其别名。格式为 cf:col->alias。 - doubleNumericRestrictsMappings:应作为双精度数(8 字节)
numeric_restricts
使用的列的以逗号分隔的完全限定列名称及其别名。格式为 cf:col->alias。 - upsertMaxBatchSize:在将批次更新/插入向量搜索索引之前要缓冲的更新/插入数量上限。当有 upsertBatchSize 条记录准备就绪时,或者任何记录等待 upsertBatchDelay 时间已过时,系统会发送批量数据。例如
10
。默认值为:10。 - upsertMaxBufferDuration:在将一批 upsert 操作发送到向量搜索之前的最长延迟时间。当有 upsertBatchSize 条记录准备就绪时,或者任何记录等待 upsertBatchDelay 时间已过时,系统会发送批量数据。允许的格式为 Ns(以秒为单位,例如 5s)、Nm(以分钟为单位,例如 12m)、Nh(以小时为单位,例如 2h)。例如
10s
。默认值为 10s。 - deleteMaxBatchSize:从向量搜索索引中删除批次之前要缓冲的最大删除次数。当有 deleteBatchSize 条记录准备就绪时,或者任何记录等待 deleteBatchDelay 时间已过时,系统会发送批量数据。例如
10
。默认值为:10。 - deleteMaxBufferDuration:将一批删除操作发送到向量搜索之前的最长延迟时间。当有 deleteBatchSize 条记录准备就绪时,或者任何记录等待 deleteBatchDelay 时间已过时,系统会发送批量数据。允许的格式为 Ns(以秒为单位,例如 5s)、Nm(以分钟为单位,例如 12m)、Nh(以小时为单位,例如 2h)。例如
10s
。默认值为 10s。 - dlqDirectory:用于存储未处理记录以及无法处理原因的路径。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,默认值就可以了。
- bigtableChangeStreamMetadataInstanceId:Bigtable 变更数据流元数据实例 ID。默认值为空。
- bigtableChangeStreamMetadataTableTableId:Bigtable 变更数据流连接器元数据表的 ID。如果未提供,系统会在流水线执行期间自动创建 Bigtable 变更数据流连接器元数据表。默认值为空。
- bigtableChangeStreamCharset:Bigtable 变更数据流字符集名称。默认为 UTF-8。
- bigtableChangeStreamStartTimestamp:用于读取变更数据流的起始时间戳 (https://tools.ietf.org/html/rfc3339)(含边界值)。例如
2022-05-05T07:59:59Z
。默认为流水线开始时间的时间戳。 - bigtableChangeStreamIgnoreColumnFamilies:要忽略的列族名称更改的逗号分隔列表。默认值为空。
- bigtableChangeStreamIgnoreColumns:要忽略的列名称更改的逗号分隔列表。示例:“cf1:col1,cf2:col2”。默认值为空。
- bigtableChangeStreamName:客户端流水线的唯一名称。允许您从之前运行的流水线停止的位置继续处理。默认为自动生成的名称。如需了解所用的值,请参阅 Dataflow 作业日志。
- bigtableChangeStreamResume:设置为
true
时,新流水线将从具有相同bigtableChangeStreamName
值的先前运行的流水线停止时的点开始处理。如果具有给定bigtableChangeStreamName
值的流水线从未运行,则新流水线不会启动。设置为false
时,新流水线会启动。如果给定来源已运行具有相同bigtableChangeStreamName
值的流水线,则新流水线无法启动。默认值为false
。 - bigtableReadChangeStreamTimeoutMs:Bigtable ReadChangeStream 请求的超时(以毫秒为单位)。
- bigtableReadProjectId:Bigtable 项目 ID。默认为 Dataflow 作业的项目。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Bigtable Change Streams to Vector Search template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud CLI
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
EMBEDDING_COLUMN
:嵌入列EMBEDDING_BYTE_SIZE
:嵌入数组的字节大小。可以是 4 或 8。VECTOR_SEARCH_INDEX
:Vector Search 索引路径BIGTABLE_CHANGE_STREAM_APP_PROFILE
:Bigtable 应用配置文件 IDBIGTABLE_READ_INSTANCE_ID
:源 Bigtable 实例 IDBIGTABLE_READ_TABLE_ID
:源 Bigtable 表 ID
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
EMBEDDING_COLUMN
:嵌入列EMBEDDING_BYTE_SIZE
:嵌入数组的字节大小。可以是 4 或 8。VECTOR_SEARCH_INDEX
:Vector Search 索引路径BIGTABLE_CHANGE_STREAM_APP_PROFILE
:Bigtable 应用配置文件 IDBIGTABLE_READ_INSTANCE_ID
:源 Bigtable 实例 IDBIGTABLE_READ_TABLE_ID
:源 Bigtable 表 ID