使用 Workflows 将数据从 Cloud Storage 加载到 BigQuery

Last reviewed 2021-05-12 UTC

本教程介绍如何使用 WorkflowsCloud Run 函数Firestore,将原始数据(例如事件日志)从 Cloud Storage 加载到 BigQuery。Analytics 平台通常具有一个编排工具,用于定期使用 BigQuery 作业在 BigQuery 中加载数据,然后使用 SQL 语句来转换数据(包括 BigQuery 过程语言语句)以提供业务指标。本教程适用于想要构建无服务器、事件驱动型数据处理流水线的开发者和架构师。本教程假定您熟悉 YAML、SQL 和 Python。

架构

下图显示了使用 Workflows 的无服务器提取、加载和转换 (ELT) 流水线的概要架构。

提取、加载和转换流水线。

在上图中,假设一个零售平台定期从各个商店以文件形式收集销售事件,然后将文件写入 Cloud Storage 存储分区。事件用于通过在 BigQuery 中导入和处理来提供业务指标。此架构提供了一个用于将文件导入到 BigQuery 中的可靠、无服务器的编排系统,它分为以下两个模块:

  • 文件列表:维护已添加到 Cloud Storage 存储分区内 Firestore 集合中的未处理文件的列表。此模块通过 Cloud Run 函数工作,该函数通过对象完成存储事件触发,而该事件在将新文件添加到 Cloud Storage 存储桶时生成。文件名会附加到 Firestore 中名为 new 的集合 files 数组中。
  • 工作流:运行计划的工作流。Cloud Scheduler 会触发一个工作流,它根据基于 YAML 的语法运行一系列步骤来编排加载,然后通过调用 Cloud Run 函数来转换 BigQuery 中的数据。工作流中的步骤调用 Cloud Run 函数以运行以下任务:

    • 创建并启动 BigQuery 加载作业。
    • 轮询加载作业状态。
    • 创建并启动转换查询作业。
    • 轮询转换作业状态。

通过使用事务来维护 Firestore 中的新文件列表,可帮助确保在工作流将这些文件导入 BigQuery 中时不会丢失任何文件。通过将作业元数据和状态存储在 Firestore 中,可以使得工作流的单独运行具有幂等性。

目标

  • 创建 Firestore 数据库。
  • 设置一个 Cloud Run 函数触发器,以跟踪添加到 Cloud Storage 存储桶区内 Firestore 中的文件。
  • 部署 Cloud Run 函数以运行和监控 BigQuery 作业。
  • 部署并运行工作流以自动执行此过程。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. 前往欢迎页面,记下项目 ID,以便在后续步骤中使用。

    前往“欢迎”页面

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

准备环境

如需准备环境,请创建 Firestore 数据库、从 GitHub 代码库克隆代码示例、使用 Terraform 创建资源、修改 Workflows YAML 文件,并安装文件生成器的要求。

  1. 如需创建 Firestore 数据库,请执行以下操作:

    1. 在 Google Cloud Console 中,前往 Firestore 页面。

      转到 Firestore

    2. 点击选择原生模式

    3. 选择位置菜单中,选择要托管 Firestore 数据库的区域。我们建议选择靠近您的物理位置的区域。

    4. 点击创建数据库

  2. 在 Cloud Shell 中,克隆源代码库:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. 在 Cloud Shell 中,使用 Terraform 创建以下资源:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目 ID
    • REGION:用于托管资源的特定 Google Cloud 地理位置,例如 us-central1
    • ZONE:用于托管资源的区域内的位置,例如 us-central1-b

    您应该会看到如下所示的消息:Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform 可帮助您以可预测的方式安全地大规模创建、更改和升级基础架构。系统会在您的项目中创建以下资源:

    • 具有必需权限的服务账号,用于确保安全访问您的资源。
    • 名为 serverless_elt_dataset 的 BigQuery 数据集和名为 word_count 的表,用于加载传入的文件。
    • 名为 ${project_id}-ordersbucket 的 Cloud Storage 存储分区,用于暂存输入文件。
    • 以下五个 Cloud Run 函数:
      • file_add_handler 用于将添加到 Cloud Storage 存储分区的文件的名称添加到 Firestore 集合。
      • create_job 用于创建一个新的 BigQuery 加载作业,并将 Firebase 集合中的文件与该作业相关联。
      • create_query 用于创建一个新的 BigQuery 查询作业。
      • poll_bigquery_job 用于获取 BigQuery 作业的状态。
      • run_bigquery_job 用于启动 BigQuery 作业。
  4. 获取您在上一步中部署的 create_jobcreate_querypoll_jobrun_bigquery_job Cloud Run 函数的网址。

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    输出类似于以下内容:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    请记下这些网址,因为您在部署工作流时需要它们。

创建和部署工作流

  1. 在 Cloud Shell 中,打开工作流的源文件 workflow.yaml

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    替换以下内容:

    • CREATE_JOB_URL:用于创建新作业的函数的网址
    • POLL_BIGQUERY_JOB_URL:用于轮询运行中作业状态的函数的网址
    • RUN_BIGQUERY_JOB_URL:用于启动 BigQuery 加载作业的函数的网址
    • CREATE_QUERY_URL:用于启动 BigQuery 查询作业的函数的网址
    • BQ_REGION:存储数据的 BigQuery 区域,例如 US
    • BQ_DATASET_TABLE_NAME:BigQuery 数据集表名称,格式为 PROJECT_ID.serverless_elt_dataset.word_count
  2. 部署 workflow 文件:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    替换以下内容:

    • WORKFLOW_NAME:工作流的唯一名称
    • WORKFLOW_REGION:在其中部署此工作流的区域,例如 us-central1
    • WORKFLOW_DESCRIPTION:工作流的说明
  3. 创建 Python 3 虚拟环境并安装文件生成器的必备项目:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

生成要导入的文件

gen.py Python 脚本会生成 Avro 格式的随机内容。架构与 BigQuery word_count 表相同。这些 Avro 文件会复制到指定的 Cloud Storage 存储桶中。

在 Cloud Shell 中,生成以下文件:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

替换以下内容:

  • RECORDS_PER_FILE:单个文件中的记录数
  • NUM_FILES:要上传的文件总数
  • FILE_PREFIX:已生成文件的名称的前缀

查看 Firestore 中的文件条目

将文件复制到 Cloud Storage 时,会触发 handle_new_file Cloud Run 函数。该函数会将文件列表添加到 Firestore jobs 集合的 new 文档的文件列表数组中。

如需查看文件列表,请在 Google Cloud 控制台中前往 Firestore 的数据页面。

前往“数据”页面

已添加到集合的文件的列表。

触发工作流

工作流将一系列无服务器任务与 Google Cloud 和 API 服务相关联。此工作流中的各个步骤以 Cloud Run 函数的形式运行,并且状态存储在 Firestore 中。所有对 Cloud Run 函数的调用都使用工作流的服务账号进行身份验证。

在 Cloud Shell 中,运行工作流:

gcloud workflows execute WORKFLOW_NAME

下图显示了在工作流中使用的步骤:

在主工作流和子工作流中使用的步骤。

工作流分为两个部分:主工作流和子工作流。主工作流处理作业创建和有条件执行,子工作流执行 BigQuery 作业。工作流会执行以下操作:

  • create_job Cloud Run 函数会创建新的作业对象、获取从 Firestore 文档添加到 Cloud Storage 的文件列表并将这些文件与加载作业相关联。如果没有要加载的文件,函数将不会创建新作业。
  • create_query Cloud Run 函数接受这样的查询:需要与应在其中执行查询的 BigQuery 区域一起执行的查询。该函数在 Firestore 中创建作业并返回作业 ID。
  • run_bigquery_job Cloud Run 函数获取需要执行的作业的 ID,然后调用 BigQuery API 来提交作业。
  • 您可以定期轮询作业状态,而不是等待 Cloud Run 函数中的作业完成。
    • poll_bigquery_job Cloud Run 函数提供作业的状态。系统会重复调用该函数,直到作业完成。
    • 如需在对 poll_bigquery_job Cloud Run 函数的调用之间添加延迟,请从 Workflows 中调用 sleep 例程

查看作业状态

您可以查看文件列表和作业的状态。

  1. 在 Google Cloud 控制台中,前往 Firestore 数据页面。

    前往“数据”页面

  2. 系统会为每个作业生成一个唯一标识符 (UUID)。如需查看 job_typestatus,请点击作业 ID。每个作业可能具有以下类型之一和状态:

    • job_type:工作流正在运行的作业的类型,值为以下值之一:

      • 0:将数据加载到 BigQuery 中。
      • 1:在 BigQuery 中运行查询。
    • status:作业的当前状态,值为以下值之一:

      • 0:作业已创建,但尚未开始。
      • 1:作业正在运行。
      • 2:作业已成功完成其执行。
      • 3:出现错误,作业未成功完成。

    作业对象还包含元数据属性,例如 BigQuery 数据集所在的区域、BigQuery 表的名称以及正在运行的查询字符串(如果作业是查询作业的话)。

突出显示了作业状态的文件列表。

在 BigQuery 中查看数据

如需确认 ELT 作业是否成功,请验证数据是否显示在表中。

  1. 在 Google Cloud 控制台中,转到 BigQuery 编辑器页面。

    前往编辑器

  2. 点击 serverless_elt_dataset.word_count 表。

  3. 点击预览标签页。

    在表格中显示数据的“预览”标签页。

安排工作流

如需定期按计划运行工作流,您可以使用 Cloud Scheduler

清理

避免产生费用的最简单方法是删除您为本教程创建的 Google Cloud 项目。或者,您也可以删除各个资源。

逐个删除资源

  1. 在 Cloud Shell 中,移除使用 Terraform 创建的所有资源:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. 在 Google Cloud 控制台中,前往 Firestore 数据页面。

    前往“数据”页面

  3. 作业旁边,点击 菜单,然后选择删除

    用于删除集合的菜单路径。

删除项目

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

后续步骤