使用模板处理数据

Dataplex Universal Catalog 提供由 Dataflow 提供支持的模板,用于执行常见的数据处理任务,例如数据注入、处理和管理数据生命周期。本指南将介绍如何配置和运行数据处理模板。

准备工作

Dataplex Universal Catalog 模板由 Dataflow 提供支持。 在使用模板之前,请启用 Dataflow API。

启用 Dataflow API

请注意以下几点:

  • 所有模板都支持常见的 Dataflow 流水线选项

  • Dataplex Universal Catalog 使用数据流水线来安排模板定义的任务。

  • 您只能在 Google Cloud 控制台的 Dataplex Universal Catalog 页面中看到通过 Dataplex Universal Catalog 安排的任务。

模板:将原始数据转换为精选数据

Dataplex Universal Catalog 文件格式转换模板可将 Dataplex Universal Catalog Cloud Storage 资产中的数据或以 CSV 或 JSON 格式存储的 Dataplex Universal Catalog 实体列表转换为 Parquet 或 Avro 格式的数据,并将其存储在另一个 Dataplex Universal Catalog 资产中。转换过程中会保留分区布局。它还支持压缩输出文件。

模板参数

参数 说明
inputAssetOrEntitiesList 包含输入文件的 Dataplex Universal Catalog 资产或 Dataplex Universal Catalog 实体。此参数必须遵循以下格式: projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>...
outputFileFormat Cloud Storage 中的输出文件格式。此参数必须遵循以下格式:PARQUETAVRO
outputAsset 包含将用于存储输出文件的 Cloud Storage 存储桶的 Dataplex 通用目录资源的名称。此参数必须遵循以下格式:projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>。您可以在 Google Cloud 控制台的 Dataplex Universal Catalog 资源 Details 标签页中找到 outputAsset
outputFileCompression 可选:输出文件压缩。此形参的默认值为 SNAPPY。该参数的其他值可以是 UNCOMPRESSEDSNAPPYGZIPBZIP2BZIP2 不支持 PARQUET 文件。
writeDisposition 可选:指定目标文件已存在时执行的操作。此参数的默认值为 SKIP,表示仅处理目标目录中不存在的文件。该形参的其他值可以是 OVERWRITE(覆盖所有现有文件)或 FAIL(不处理任何内容,如果至少有一个目标文件已存在,则会生成错误)。
updateDataplexMetadata

可选:是否更新新创建实体的 Dataplex Universal Catalog 元数据。此参数的默认值为 false

如果启用,管道会将架构从源自动复制到目标 Dataplex 实体,并且不会针对这些实体运行自动 Dataplex Universal Catalog 发现。如果源(原始)数据的架构由 Dataplex 管理,请使用此标志。

运行模板

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex Universal Catalog 页面。

    前往 Dataplex Universal Catalog

  2. 前往进程视图。

  3. 点击创建任务

  4. 转换为专业格式下,点击创建任务

  5. 选择一个 Dataplex Universal Catalog 数据湖。

  6. 提供任务名称。

  7. 选择任务执行区域。

  8. 填写必需参数。

  9. 点击继续

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \
--parameters \
inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\
outputFileFormat=OUTPUT_FILE_FORMAT,\
outputAsset=OUTPUT_ASSET

替换以下内容:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex Universal Catalog output asset ID

REST

提交 HTTP POST 请求:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST",
        "outputFileFormat": "OUTPUT_FILE_FORMAT",
        "outputAsset": "OUTPUT_ASSET",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview",
 }
}

替换以下内容:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex Universal Catalog output asset ID

模板:将 BigQuery 资产中的数据分层到 Cloud Storage 资产

Dataplex Universal Catalog BigQuery 到 Cloud Storage 模板可将数据从 Dataplex Universal Catalog BigQuery 资产复制到 Dataplex Universal Catalog Cloud Storage 资产,并采用与 Dataplex Universal Catalog 兼容的布局和格式。您可以指定要复制的 BigQuery 数据集或 BigQuery 表的列表。为了提高灵活性,该模板允许复制修改日期早于指定日期的旧数据,并且允许在成功复制后选择性地从 BigQuery 中删除数据。

将分区表从 BigQuery 复制到 Cloud Storage 时:

  • 该模板会在 Cloud Storage 存储桶上创建 Hive 样式的分区。 BigQuery 的 Hive 样式分区键不能与现有列相同。您可以使用选项 enforceSamePartitionKey 创建新的分区键,也可以保留相同的分区键,但重命名现有列。
  • 在创建 BigQuery 表(以及 Dataproc Metastore 中的表)时,Dataplex Universal Catalog 发现功能会将分区类型注册为 string。这可能会影响您现有的分区过滤条件。

单个模板运行中可转换的表和分区数量有限制,约为 300 个。确切的数量取决于表名称的长度和其他因素。

模板参数

参数 说明
sourceBigQueryDataset 用于分层存储数据的 BigQuery 数据集。此参数必须包含 Dataplex Universal Catalog 资产名称(格式为 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>)或 BigQuery 数据集 ID(格式为 projects/<name>/datasets/<dataset-id>)。
destinationStorageBucketAssetName 要将数据分层存储到的 Cloud Storage 存储桶的 Dataplex Universal Catalog 资产名称。此参数必须遵循 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 格式。
tables 可选:要分层的 BigQuery 表的英文逗号分隔列表。如果未提供任何列表,则所有表都将分层。必须仅按名称指定表(无项目或数据集前缀),并且区分大小写。
exportDataModifiedBeforeDateTime 可选:使用此参数可移动早于指定日期(和可选时间)的数据。对于已分区的 BigQuery 表,请移动在此日期/时间之前最后修改的分区。对于非分区表,如果表的上次修改时间早于此日期/时间,则移动。如果未指定,则移动所有表/分区。默认情况下,日期/时间以默认时区进行解析,但支持可选后缀 Z+HH:mm。此参数必须遵循 YYYY-MM-DDYYYY-MM-DDTHH:mm:ssYYYY-MM-DDTHH:mm:ss+03:00 格式。 系统还支持相对日期/时间,但必须遵循 -PnDTnHnMn.nS 格式(必须以 -P 开头,表示过去的时间)。
fileFormat 可选:Cloud Storage 中的输出文件格式。此参数的默认值为 PARQUET。参数的另一个值可以是 AVRO
fileCompression 可选:输出文件压缩。此形参的默认值为 SNAPPY。该参数的其他值可以是 UNCOMPRESSEDSNAPPYGZIPBZIP2。不支持对 PARQUET 文件使用 BZIP2
deleteSourceData 可选:是否在成功导出后从 BigQuery 中删除源数据。值可以是 truefalse。此参数的默认值为 false
partitionIdRegExp 可选:仅处理分区 ID 与此正则表达式匹配的分区。如果未提供值,此参数默认设置为处理所有内容。
writeDisposition 可选:指定目标文件已存在时执行的操作,这意味着一个或多个表/分区已预先分层。此参数的默认值为 SKIP,表示仅处理尚未预先分层的表/分区。该形参的其他值可以是 OVERWRITE(覆盖所有现有文件)或 FAIL(不处理任何内容,如果至少有一个目标文件已存在,则会生成错误)。
enforceSamePartitionKey

可选:是否强制使用相同的分区键。由于 BigQuery 存在限制,分区外部表中的分区键(位于文件路径中)不能与文件中的任何列同名。如果此参数为 true(默认值),则目标文件的分区键将设置为原始分区列名称,并且文件中的列将重命名。如果为 false,则重命名分区键。

例如,如果原始表基于名为 TSenforceSamePartitionKey=true 的列进行分区,则目标文件路径为 gs://<bucket>/TS=<partition ID>/<file>,并且该列在文件中重命名为 TS_pkey。这样一来,现有查询就可以针对旧表或新表中的相同分区执行。

如果值为 enforceSamePartitionKey=false,则目标文件路径为 gs://<bucket>/TS_pid=<partition ID>/<file>,但列名称在文件中仍为 TS

updateDataplexMetadata

可选:是否更新新创建实体的 Dataplex Universal Catalog 元数据。此参数的默认值为 false

如果启用,管道会将架构从源自动复制到目标 Dataplex 实体,并且不会针对这些实体运行自动 Dataplex Universal Catalog 发现。如果您要管理源 BigQuery 表的架构,请使用此标志。

运行模板

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex Universal Catalog 页面。

    前往 Dataplex Universal Catalog

  2. 前往进程视图。

  3. 点击创建任务

  4. 从 BQ 到 GCS 资产分层下,点击创建任务

  5. 选择一个 Dataplex Universal Catalog 数据湖。

  6. 提供任务名称。

  7. 选择任务执行区域。

  8. 填写必需参数。

  9. 点击继续

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \
--parameters \
sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\
destinationStorageBucketAssetName=DESTINATION_ASSET_NAME

替换以下内容:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex Universal Catalog asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex Universal Catalog asset name for
the destination Cloud Storage bucket

REST

提交 HTTP POST 请求:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
 "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID",
        "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview",
 }
}

替换以下内容:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex Universal Catalog asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex Universal Catalog asset name for
the destination Cloud Storage bucket
REGION_NAME: region in which to run the job

安排其他 Google Cloud提供的或自定义的 Dataflow 模板

借助 Dataplex Universal Catalog,您可以在控制台中安排和监控任何Google Cloud提供的 Dataflow 模板或自定义 Dataflow 模板。

时间表

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex Universal Catalog 页面。

    前往 Dataplex Universal Catalog

  2. 前往进程视图。

  3. 点击创建任务

  4. 编写 Dataflow 流水线下,点击创建 Dataflow 流水线

  5. 选择一个 Dataplex Universal Catalog 数据湖。

  6. 提供任务名称。

  7. 选择运行任务的区域。

  8. 选择 Dataflow 模板。

  9. 填写必需参数。

  10. 点击继续

监控

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex Universal Catalog 页面。

    前往 Dataplex Universal Catalog

  2. 前往进程视图。

  3. 点击 Dataflow 流水线

  4. 按数据湖或流水线名称过滤。