本文档展示了可用于在 Dataform 中创建工作流的 Dataform 核心和 JavaScript 脚本示例。
创建表格
使用 Dataform 核心创建视图
以下代码示例展示了 definitions/new_view.sqlx
文件中名为 new_view
的视图的定义:
config { type: "view" }
SELECT * FROM source_data
使用 Dataform 核心创建具体化视图
以下代码示例显示了 definitions/new_materialized_view.sqlx
文件中名为 new_materialized_view
的具体化视图的定义:
config {
type: "view",
materialized: true
}
SELECT * FROM source_data
使用 Dataform 核心创建表
以下代码示例展示了 definitions/new_table.sqlx
文件中名为 new_table
的表的定义:
config { type: "table" }
SELECT * FROM source_data
使用 Dataform 核心创建增量表
以下代码示例展示了一个增量表,该表会以增量方式处理 productiondb.logs
表的行:
config { type: "incremental" }
SELECT timestamp, message FROM ${ref("productiondb", "logs")}
${when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
使用 ref
函数通过 Dataform Core 引用表
以下代码示例展示了 ref
函数,该函数用于引用 definitions/new_table_with_ref.sqlx
表定义文件中的 source_data
表:
config { type: "table" }
SELECT * FROM ${ref("source_data")}
使用 Dataform 核心为表、视图或声明添加文档
以下代码示例展示了 definitions/documented_table.sqlx
表定义文件中的表和列说明:
config { type: "table",
description: "This table is an example",
columns:{
user_name: "Name of the user",
user_id: "ID of the user"
}
}
SELECT user_name, user_id FROM ${ref("source_data")}
配置增量表
使用 Dataform Core 为源数据中的新日期添加新的表格行
以下代码示例展示了 definitions/incremental_table.sqlx
文件中增量表的配置。在此配置中,Dataform 会为每个新日期向 incremental_table
中附加一个新行:
config { type: "incremental" }
SELECT date(timestamp) AS date, action
FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (select max(date) FROM ${self()})`)
使用 Dataform Core 定期拍摄表的快照
以下代码示例展示了 definitions/snapshots_table.sqlx
文件中增量表的配置。在此配置中,Dataform 会在指定日期创建 productiondb.customers
的快照,并将其存储在 snapshots_table
中:
config { type: "incremental" }
SELECT current_date() AS snapshot_date, customer_id, name, account_settings FROM productiondb.customers
${ when(incremental(), `WHERE snapshot_date > (SELECT max(snapshot_date) FROM ${self()})`) }
使用 Dataform Core 构建可增量更新的滚动 30 天表
以下代码示例展示了 definitions/incremental_example.sqlx
文件中增量表的配置。在此配置中,Dataform 会创建一个临时 incremental_example
,该表会以增量方式更新,并在创建 30 天后删除:
config {type: "incremental"}
post_operations {
delete FROM ${self()} WHERE date < (date_add(Day, -30, CURRENT_DATE))
}
SELECT
date(timestamp) AS date,
order_id,
FROM source_table
${ when(incremental(), `WHERE timestamp > (SELECT max(date) FROM ${self()})`) }
创建自定义 SQL 操作
使用 Dataform 核心在 SQLX 文件中运行多个 SQL 操作
以下代码示例展示了如何使用 ;
分隔 definitions/operations.sqlx
中定义的多个 SQL 操作:
config { type: "operations" }
DELETE FROM datatable where country = 'GB';
DELETE FROM datatable where country = 'FR';
在创建 Dataform 核心表之前运行自定义 SQL
以下代码示例展示了在 definitions/table_with_preops.sqlx
表定义文件的 pre_operations
块中定义的自定义 SQL 操作:
config {type: "table"}
SELECT * FROM ...
pre_operations {
INSERT INTO table ...
}
使用 Dataform Core 创建表后运行自定义 SQL
以下代码示例展示了在 definitions/table_with_postops.sqlx
表定义文件的 post_operations
块中定义的自定义 SQL 操作:
config {type: "table"}
SELECT * FROM ...
post_operations {
GRANT `roles/bigquery.dataViewer`
ON
TABLE ${self()}
TO "group:allusers@example.com", "user:otheruser@example.com"
}
验证表格
使用 Dataform Core 向表、视图或声明添加断言
以下代码示例展示了添加到 definitions/tested_table.sqlx
表定义文件中的 uniqueKey
、nonNull
和 rowConditions
断言:
config {
type: "table",
assertions: {
uniqueKey: ["user_id"],
nonNull: ["user_id", "customer_id"],
rowConditions: [
'signup_date is null or signup_date > "2022-01-01"',
'email like "%@%.%"'
]
}
}
SELECT ...
使用 Dataform Core 添加自定义断言
以下代码示例展示了表定义文件中的自定义断言,该断言用于验证 source_data
中的列 a
、b
或 c
是否为 null
:
config { type: "assertion" }
SELECT
*
FROM
${ref("source_data")}
WHERE
a is null
or b is null
or c is null
使用 JavaScript 进行开发
使用 JavaScript 中的内嵌变量和函数
以下代码示例展示了在 js
块中定义的 foo
变量,然后如何在 SQLX 文件中以内嵌方式使用该变量:
js {
const foo = 1;
function bar(number){
return number+1;
}
}
SELECT
${foo} AS one,
${bar(foo)} AS two
使用 JavaScript 为每个国家/地区生成一个表格
以下代码示例展示了如何使用 forEach
函数在 definitions/one_table_per_country.js
文件中为 countries
中定义的每个国家/地区生成一个表:
const countries = ["GB", "US", "FR", "TH", "NG"];
countries.forEach(country => {
publish("reporting_" + country)
.dependencies(["source_table"])
.query(
ctx => `
SELECT '${country}' AS country
`
);
});
使用 JavaScript 在一个文件中声明多个来源
以下代码示例展示了如何在 definitions/external_dependencies.js
文件中声明多个数据源:
declare({
schema: "stripe",
name: "charges"
});
declare({
schema: "shopify",
name: "orders"
});
declare({
schema: "salesforce",
name: "accounts"
});
使用 forEach
在一个文件中声明多个来源
以下代码示例展示了如何在 definitions/external_dependencies.js
文件中使用 forEach
函数声明多个数据源:
["charges", "subscriptions", "line_items", "invoices"]
.forEach(source => declare({
schema: "stripe",
name: source
})
);
使用 JavaScript 删除包含个人身份信息的所有表中的敏感信息
以下代码示例展示了 definitions/delete_pii.js
文件中的一个函数,该函数会删除包含个人身份信息 (PII) 的所有表中的所选信息:
const pii_tables = ["users", "customers", "leads"];
pii_tables.forEach(table =>
operate(`gdpr_cleanup: ${table}`,
ctx => `
DELETE FROM raw_data.${table}
WHERE user_id in (SELECT * FROM users_who_requested_deletion)`)
.tags(["gdpr_deletion"]))
);
使用 JavaScript 添加 preOps
和 postOps
以下代码示例展示了如何使用 publish
函数在 definitions/pre_and_post_ops_example.js
表中创建包含 preOps
和 postOps
的查询:
publish("example")
.preOps(ctx => `GRANT \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)
.query(ctx => `SELECT * FROM ${ctx.ref("other_table")}`)
.postOps(ctx => `REVOKE \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)
使用 JavaScript 创建增量表
以下代码示例展示了用于在 definitions/incremental_example.js
文件中创建增量表的 publish
函数:
publish("incremental_example", {
type: "incremental"
}).query(ctx => `
SELECT * FROM ${ctx.ref("other_table")}
${ctx.when(ctx.incremental(),`WHERE timestamp > (SELECT MAX(date) FROM ${ctx.self()}`)}
`)
使用 JavaScript 回填每日表
以下代码示例展示了如何在 definitions/backfill_daily_data.js
文件中回填每日更新的表:
var getDateArray = function(start, end) {
var startDate = new Date(start); //YYYY-MM-DD
var endDate = new Date(end); //YYYY-MM-DD
var arr = new Array();
var dt = new Date(startDate);
while (dt <= endDate) {
arr.push(new Date(dt).toISOString().split("T")[0]);
dt.setDate(dt.getDate() + 1);
}
return arr;
};
var dateArr = getDateArray("2020-03-01", "2020-04-01");
// step 1: create table
operate(`create table`, 'create table if not exists backfill_table (`fields`) `);
// step 2: insert into the table
dateArr.forEach((day, i) =>
operate(`backfill ${day}`
`insert into backfill_table select fields where day = '${day}'`)
);
通过包含指令重用代码
使用 JavaScript 中的全局变量
以下代码示例展示了 includes/constants.js
中 project_id
和 first_date
常量的定义:
const project_id = "project_id";
const first_date = "'1970-01-01'";
module.exports = {
project_id,
first_date
};
以下代码示例展示了 definitions/new_table.sqlx
文件中引用的 first_date
常量:
config {type: "table"}
SELECT * FROM source_table WHERE date > ${constants.first_date}
使用 JavaScript 创建国家/地区映射
以下代码示例展示了在 includes/mapping.js
文件中定义的 country_group
自定义函数:
function country_group(country){
return `
case
when ${country} in ('US', 'CA') then 'NA'
when ${country} in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
when ${country} in ('AU') then ${country}
else 'Other'
end`;
}
module.exports = {
country_group
};
以下代码示例展示了在 definitions/new_table.sqlx
表定义文件中使用 country_group
函数的表定义:
config { type: "table"}
SELECT
country AS country,
${mapping.country_group("country")} AS country_group,
device_type AS device_type,
sum(revenue) AS revenue,
sum(pageviews) AS pageviews,
sum(sessions) AS sessions
FROM ${ref("source_table")}
GROUP BY 1, 2, 3
以下代码示例展示了 definitions/new_table.sqlx
中定义的查询编译为 SQL 的结果:
SELECT
country AS country,
case
when country in ('US', 'CA') then 'NA'
when country in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
when country in ('AU') then country
else 'Other'
end AS country_group,
device_type AS device_type,
sum(revenue) AS revenue,
sum(pageviews) AS pageviews,
sum(sessions) AS sessions
FROM "dataform"."source_table"
GROUP BY 1, 2, 3
使用自定义 JavaScript 函数生成 SQL 脚本
以下代码示例展示了在 includes/script_builder.js
中定义的 render_script
自定义函数:
function render_script(table, dimensions, metrics) {
return `
SELECT
${dimensions.map(field => `${field} AS ${field}`).join(",")},
${metrics.map(field => `sum(${field}) AS ${field}`).join(",\n")}
FROM ${table}
GROUP BY ${dimensions.map((field, i) => `${i + 1}`).join(", ")}
`;
}
module.exports = { render_script };
以下代码示例展示了在 definitions/new_table.sqlx
表定义文件中使用 render_script
函数的表定义:
config {
type: "table",
tags: ["advanced", "hourly"],
disabled: true
}
${script_builder.render_script(ref("source_table"),
["country", "device_type"],
["revenue", "pageviews", "sessions"]
)}
以下代码示例展示了 definitions/new_table.sqlx
中定义的查询编译为 SQL 的结果:
SELECT
country AS country,
device_type AS device_type,
sum(revenue) AS revenue,
sum(pageviews) AS pageviews,
sum(sessions) AS sessions
FROM "dataform"."source_table"
GROUP BY 1, 2
操作配置
加载包含操作配置的 SQL 文件
操作配置有助于加载纯 SQL 文件。您可以在 definitions
文件夹中的 actions.yaml
文件中定义操作配置。
如需详细了解可用的操作类型和有效的操作配置选项,请参阅 Dataform 配置参考。
以下代码示例展示了 definitions/actions.yaml
文件中名为 new_view
的视图的定义:
actions:
- view:
filename: new_view.sql
上述代码示例中引用的 definitions/new_view.sql
SQL 文件包含纯 SQL:
SELECT * FROM source_data