在 Dataform 中使用缓慢变化维度

本文档将以在 Dataform 中使用开源“缓慢变化的维度”软件包为例,介绍如何使用开源软件包。

“缓慢变化的维度”软件包包含常用的数据模型,用于在 Dataform 中根据可变数据源创建第 2 类型的缓慢变化的维度表。

缓慢变化的维度表是增量表,其中包含的数据可能会不按常规时间表发生不可预测的变化,例如客户或产品。在类型 2 的缓慢变化维度表中,新数据会附加到新行中,而不会覆盖现有表行。表历史记录会保留在缓慢变化的维度键中的给定键的多个记录中。每个记录都有一个唯一键。

缓慢变化维度软件包会在 BigQuery 中为给定 NAME 创建以下关系:

  • NAME - 包含 scd_valid_fromscd_valid_to 字段的视图
  • NAME_updates - 用于存储源表更改历史记录的增量表

每次执行缓慢变化维度增量表时,Dataform 都会更新缓慢变化维度。您可能需要将缓慢变化的维度表安排每天或每小时运行一次,具体取决于您要捕获的更改粒度。

如需了解如何安排 Dataform 执行作业,请参阅使用 Cloud Composer 安排执行作业使用 Workflows 和 Cloud Scheduler 安排执行作业

准备工作

  1. “缓慢变化维度”版本页面上,复制最新版本的 .tar.gz 网址。
  2. 创建 Dataform 代码库
  3. 在代码库中创建并初始化工作区
  4. 在 Dataform 代码库中安装“缓慢变化维度”软件包

所需的角色

如需获得配置软件包所需的权限,请让您的管理员为您授予工作区的 Dataform Editor (roles/dataform.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

使用“缓慢变化维度”软件包创建缓慢变化维度表

如需在 Dataform 中使用“缓慢变化维度”软件包创建缓慢变化维度表,请按以下步骤操作:

  1. 文件窗格中,点击 definitions/ 旁边的 更多菜单。

  2. 点击创建文件

  3. 创建新文件窗格中,执行以下操作:

    1. 添加文件路径字段中,在 definitions/ 后面输入文件名称,后跟 .js。例如 definitions/definitions.js

    2. 点击创建文件

  4. Files 窗格中,选择新创建的 .js。写入文件

  5. 将该软件包以以下格式导入到文件中:

     const CONSTANT-NAME = require("dataform-scd");
    

    CONSTANT-NAME 替换为常量的名称,例如 scd

  6. 请按照以下格式创建缓慢变化维度表格:

    scd("source_data_scd", {
      uniqueKey: "UNIQUE_ID",
      timestamp: "UPDATED_AT", // A field that stores a timestamp or date of when the row was last changed.
      source: {
        schema: "SOURCE_SCHEMA",     // The source table to build slowly changing dimensions from.
        name: "SOURCE_SCHEMA_NAME",
      },
      incrementalConfig: {        // Any configuration parameters to apply to the incremental table that will be created.
        bigquery: {
          partitionBy: "UPDATED_AT",
        },
      },
    });
    

    替换以下内容:

    • UNIQUE_ID:表中行的唯一标识符
    • UPDATED_AT:用于存储行上次更改的时间戳或日期的字段的名称,例如 updated_at
    • SOURCE_SCHEMA:源表的架构,例如 dataform_scd_example
    • SOURCE_SCHEMA_NAME:源表的名称,例如 source_data
  7. 可选:点击格式

以下代码示例展示了使用“缓慢变化维度”软件包创建的缓慢变化维度表定义:

const scd = require("dataform-scd");

/**
 * Create an SCD table on top of the table defined in source_data.sqlx.
 */
const { updates, view } = scd("source_data_scd", {
  // A unique identifier for rows in the table.
  uniqueKey: "user_id",
  // A field that stores a timestamp or date of when the row was last changed.
  timestamp: "updated_at",
  // The source table to build slowly changing dimensions from.
  source: {
    schema: "dataform_scd_example",
    name: "source_data",
  },
  // Any tags that will be added to actions.
  tags: ["slowly-changing-dimensions"],
  // Documentation of table columns
  columns: {user_id: "User ID", some_field: "Data Field", updated_at: "Timestamp for updates"},
  // Configuration parameters to apply to the incremental table that will be created.
  incrementalConfig: {
    bigquery: {
      partitionBy: "updated_at",
    },
  },
});

// Additional customization of the created models can be done by using the returned actions objects.
updates.config({
  description: "Updates table for SCD",
});

后续步骤