Datastream to Spanner 模板

Datastream to Spanner 模板是一种流处理流水线,可从 Cloud Storage 存储桶中读取 Datastream 事件并将其写入 Spanner 数据库。它适用于从 Datastream 来源到 Spanner 的数据迁移。指定 gcsPubSubSubscription 参数以从 Pub/Sub 通知中读取数据,或者提供 inputFilePattern 参数以直接从 Cloud Storage 中的文件中读取数据。

在执行模板之前,迁移所需的所有表必须存在于目标 Spanner 数据库中。因此,在数据迁移之前,必须完成从源数据库到目标 Spanner 的架构迁移。在迁移之前,数据可能存在表中。此模板不会将 Datastream 架构更改传播到 Spanner 数据库。

只有在所有数据都写入 Spanner 后,才能在迁移结束时保证数据一致性。为了存储写入 Spanner 的每个记录的排序信息,此模板为 Spanner 数据库中的每个表创建了一个额外的表(称为影子表)。这用于确保迁移结束时的一致性。影子表在迁移后不会被删除,可在迁移结束时用于进行验证。

操作期间发生的任何错误(例如架构不匹配、JSON 文件格式错误或执行转换产生的错误)都会记录在错误队列中。错误队列是一个 Cloud Storage 文件夹,它以文本格式存储遇到错误的所有 Datastream 事件以及错误原因。这些错误可能是暂时性的,也可能是永久性的,它们存储在错误队列的相应 Cloud Storage 文件夹中。系统会自动重试暂时性错误,但不会自动重试永久性错误。如果发生永久性错误,您可以选择在模板运行期间更正更改事件,并将它们转移到可重试的存储桶。

流水线要求

  • 处于正在运行未启动状态的 Datastream 数据流。
  • 要在其中复制 Datastream 事件的 Cloud Storage 存储桶。
  • 包含现有表的 Spanner 数据库。这些表可以为空,也可以包含数据。

模板参数

必需参数

  • instanceId:在其中复制更改的 Spanner 实例。
  • databaseId:在其中复制更改的 Spanner 数据库。

可选参数

  • inputFilePattern:包含要复制的 Datastream 文件的 Cloud Storage 文件位置。通常,这是数据流的根路径。此功能的支持已被停用。请仅将此功能用于重试进入严重 DLQ 的条目。
  • inputFileFormat:Datastream 生成的输出文件的格式。例如 avro,json。默认值为 avro
  • sessionFilePath:Cloud Storage 中的会话文件路径,其中包含 HarbourBridge 的映射信息。
  • projectId:Spanner 项目 ID。
  • spannerHost:要在模板中调用的 Cloud Spanner 端点。例如 https://batch-spanner.googleapis.com。默认值为:https://batch-spanner.googleapis.com
  • gcsPubSubSubscription:Cloud Storage 通知政策中使用的 Pub/Sub 订阅。对于名称,请使用 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> 格式。
  • streamName:用于轮询架构信息和来源类型的数据流的名称或模板。
  • shadowTablePrefix:用于为影子表命名的前缀。默认值:shadow_
  • shouldCreateShadowTables:此标志指示是否必须在 Cloud Spanner 数据库中创建影子表。默认值为:true。
  • rfcStartDateTime:用于从 Cloud Storage 中提取数据的起始日期时间 (https://tools.ietf.org/html/rfc3339)默认值为:1970-01-01T00:00:00.00Z。
  • fileReadConcurrency:要读取的并发 DataStream 文件的数量。默认值为:30。
  • deadLetterQueueDirectory:存储错误队列输出时使用的文件路径。默认文件路径为 Dataflow 作业的临时位置下的目录。
  • dlqRetryMinutes:死信队列重试之间的分钟数。默认值为 10
  • dlqMaxRetryCount:可通过 DLQ 重试临时错误的次数上限。默认值为 500
  • dataStreamRootUrl:Datastream API 根网址。默认值为 https://datastream.googleapis.com/
  • datastreamSourceType:这是 Datastream 连接到的源数据库的类型。示例:mysql/oracle。在测试时,如果没有实际运行的 Datastream,则需要进行设置。
  • roundJsonDecimals:如果设置了此标志,则会将 json 列中的小数值四舍五入为可以在不损失精度的情况下存储的数字。默认值为:false。
  • runMode:这是运行模式类型,无论是常规模式还是具有 retryDLQ 的模式。默认值为:regular。
  • transformationContextFilePath:Cloud Storage 中的转换上下文文件路径,用于填充迁移期间执行转换时使用的数据。示例:用于标识从中迁移行的数据库的分片 ID 到数据库名称。
  • directoryWatchDurationInMinutes:流水线应持续轮询 GCS 中某个目录的时长。Datastream 输出文件以目录结构进行排列,该目录结构描述了按分钟分组的事件时间戳。此参数应大致等于在源数据库中发生的事件与 Datastream 写入 GCS 的同一事件之间可能出现的最长延迟时间。第 99.9 百分位 = 10 分钟。默认值为:10。
  • spannerPriority:Cloud Spanner 调用的请求优先级。该值必须为以下值之一:[HIGH,MEDIUM,LOW]。默认值为 HIGH
  • dlqGcsPubSubSubscription:在常规模式下运行时,Cloud Storage 通知政策中用于 DLQ 重试目录的 Pub/Sub 订阅。对于名称,请使用 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> 格式。设置后,系统会忽略 deadLetterQueueDirectory 和 dlqRetryMinutes。
  • transformationJarPath:Cloud Storage 中的自定义 JAR 文件位置,其中包含用于处理正向迁移中的记录的自定义转换逻辑。默认值为空。
  • transformationClassName:包含自定义转换逻辑的完全限定类名称。如果指定了 transformationJarPath,则这是必填字段。默认值为空。
  • transformationCustomParameters:包含要传递给自定义转换类的任何自定义参数的字符串。默认值为空。
  • filteredEventsDirectory:这是用于存储通过自定义转换过滤的事件的文件路径。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,默认值就可以了。
  • shardingContextFilePath:Cloud Storage 中的分片上下文文件路径,用于填充 Spanner 数据库中每个源分片的分片 ID。其格式为 Map<stream_name, Map<db_name, shard_id>>。
  • tableOverrides:这些是从源到 Spanner 的表名称替换。它们采用以下格式编写:[{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]此示例展示了如何将 Singers 表映射到 Vocalists 表,并将 Albums 表映射到 Records 表。例如 [{Singers, Vocalists}, {Albums, Records}]。默认值为空。
  • columnOverrides:这些是从源到 Spanner 的列名称替换。它们的格式如下:[{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]请注意,SourceTableName 在源和 Spanner 对中应保持不变。如需替换表名称,请使用 tableOverrides。此示例展示了如何分别将 Singers 表中的 SingerName 映射到 TalentName,以及将 Albums 表中的 AlbumName 映射到 RecordName。例如 [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]。默认值为空。
  • schemaOverridesFilePath:一个文件,用于指定从源到 Spanner 的表和列名称替换。默认值为空。
  • shadowTableSpannerDatabaseId:可选的单独数据库,用于存储影子表。如果未指定,则会在主数据库中创建影子表。如果指定了该值,请确保还指定了 shadowTableSpannerInstanceId。默认值为空。
  • shadowTableSpannerInstanceId:可选的影子表专用实例。如果未指定,则会在主实例中创建影子表。如果指定了该值,请确保还指定了 shadowTableSpannerDatabaseId。默认值为空。
  • failureInjectionParameter:故障注入参数。仅用于测试。默认值为空。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Datastream to Spanner template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:Spanner 实例。
  • CLOUDSPANNER_DATABASE:Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:Spanner 实例。
  • CLOUDSPANNER_DATABASE:Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。

后续步骤