并行运行多个 BigQuery 作业


BigQuery 托管了许多公共数据集,供公众查询。在本教程中,您将创建一个并行运行多个 BigQuery 查询作业的工作流,并证明与依次串行运行作业相比,性能得到了提升。

目标

在此教程中,您将学习以下操作:

  1. Wikipedia 公共数据集运行查询,以确定特定月份浏览次数最多的图书。
  2. 部署并执行一个工作流,该工作流会依次串行运行多个 BigQuery 查询作业。
  3. 部署并执行一个工作流,该工作流使用并行迭代运行 BigQuery 作业,并并行执行普通的 for 循环。

您可以在 Google Cloud 控制台中运行以下命令,也可以在终端或 Cloud Shell 中使用 Google Cloud CLI 运行这些命令。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Workflows API.

    Enable the API

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: BigQuery > BigQuery Job User, Logging > Logs Writer.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. Make sure that billing is enabled for your Google Cloud project.

  8. Enable the Workflows API.

    Enable the API

  9. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: BigQuery > BigQuery Job User, Logging > Logs Writer.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

gcloud

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/bigquery.jobUser, roles/logging.logWriter:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/bigquery.jobUser, roles/logging.logWriter:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant

运行 BigQuery 查询作业

在 BigQuery 中,您可以运行交互式(按需)查询作业。如需了解详情,请参阅运行交互式查询作业和批量查询作业

控制台

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 查询编辑器文本区域中输入以下 BigQuery SQL 查询:

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. 点击运行

bq

在终端中,输入以下 bq query 命令,以使用标准 SQL 语法运行交互式查询:

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

这会运行一个查询,返回特定月份内浏览量最多的前 100 个维基百科条目,并将输出写入临时表。

记下查询运行所需的时间。

部署以串行方式运行多个查询的工作流

工作流定义由一系列使用 Workflows 语法描述的步骤组成。创建工作流后,可以进行部署,使其可以执行。部署步骤还会验证源文件是否可以执行。

以下工作流定义了五个表的列表,以便使用 Workflows BigQuery 连接器对其运行查询。这些查询会依次串行运行,每个表中观看次数最多的影视内容会保存到结果映射中。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    转到 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-serial-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务账号

    您应该已向服务账号授予 BigQuery> BigQuery Job UserLogging> Logs Writer IAM 角色。

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

gcloud

  1. 打开终端,为您的工作流创建源代码文件:

    touch workflow-serial-bqjobs.yaml
  2. 将以下工作流复制到您的源代码文件中:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com 替换为您之前创建的服务账号的电子邮件地址。

    您应该已向服务账号授予 roles/bigquery.jobUserroles/logging.logWriter IAM 角色。

执行工作流并串行运行多个查询

执行某个工作流会运行与该工作流关联的当前工作流定义。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    转到 Workflows

  2. Workflows 页面上,选择 workflow-serial-bqjobs 工作流以转到其详情页面。

  3. 工作流详细信息页面上,选择 执行

  4. 再次点击执行

  5. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端。

  2. 执行工作流:

     gcloud workflows run workflow-serial-bqjob

工作流执行过程大约需要一分钟,或前一次运行时间的五倍。结果将包含每个表,并且类似于下表:

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

部署并执行并行运行多个查询的工作流

您可以通过进行一些更改,改为并行运行五个查询,而不是顺序运行:

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • parallel 步骤允许 for 循环的每个迭代并行运行。
  • results 变量声明为 shared,这样它就可以被分支写入,并且每个分支的结果都可以附加到它。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    转到 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-parallel-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

  9. 工作流详细信息页面上,选择 执行

  10. 再次点击执行

  11. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端,为您的工作流创建源代码文件:

    touch workflow-parallel-bqjobs.yaml
  2. 将以下工作流复制到您的源代码文件中:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com 替换为您之前创建的服务账号的电子邮件地址。

  4. 执行工作流:

     gcloud workflows run workflow-parallel-bqjobs

结果将与上一个输出类似,但工作流执行时间应该不超过 20 秒!

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除教程资源

删除在本教程中创建的工作流:

gcloud workflows delete WORKFLOW_NAME

后续步骤