将 CI/CD 流水线用于数据处理工作流

Last reviewed 2024-06-27 UTC

本文档介绍了如何通过在 Google Cloud 上使用代管式产品实现 CI/CD 方法,设置持续集成/持续部署 (CI/CD) 流水线以处理数据。数据科学家和分析师可以调整从 CI/CD 实践中总结出来的方法,帮助确保数据流程和工作流的高质量、可维护性和适应性。您可以运用以下方法:

  • 源代码的版本控制。
  • 自动构建、测试和部署应用。
  • 与生产环境分开且隔离。
  • 可复制的环境设置程序。

本文档适用于担任下列工作的数据科学家和分析师:构建重复运行的数据处理作业以帮助他们构建研发 (R&D),从而系统性且自动维护数据处理工作负载。

架构

下图显示了 CI/CD 流水线步骤的详细视图。

CI/CD 流水线的架构图。

测试环境和生产环境的部署分为两个不同的 Cloud Build 流水线(测试流水线和生产流水线)。

在上图中,测试流水线包含以下步骤:

  1. 开发者对 Cloud Source Repositories 代码库的代码进行更改。
  2. 代码更改触发 Cloud Build 中的测试 build。
  3. Cloud Build 构建自动执行 JAR 文件,并将其部署到 Cloud Storage 上的测试 JAR 存储分区。
  4. Cloud Build 将测试文件部署到 Cloud Storage 上的测试文件存储分区。
  5. Cloud Build 在 Cloud Composer 中设置变量,以引用新部署的 JAR 文件。
  6. Cloud Build 测试数据处理工作流有向无环图 (DAG),并将其部署到 Cloud Storage 上的 Cloud Composer 存储桶。
  7. 此时工作流 DAG 文件已部署到 Cloud Composer。
  8. Cloud Build 触发新部署的数据处理工作流开始运行。
  9. 数据处理工作流集成测试通过后,消息会发布到 Pub/Sub,该消息包含对消息数据字段中最新自动执行 JAR(从 Airflow 变量中获取)的引用。

在上图中,生产流水线包含以下步骤:

  1. 当消息发布到 Pub/Sub 主题时,系统会触发生产部署流水线。
  2. 开发者手动批准生产部署流水线,构建运行。
  3. Cloud Build 将最新的自动执行 JAR 文件从测试 JAR 存储分区复制到 Cloud Storage 上的生产 JAR 存储分区。
  4. Cloud Build 测试生产数据处理工作流 DAG 并将其部署到 Cloud Storage 上的 Cloud Composer 存储分区。
  5. 此时生产工作流 DAG 文件已部署到 Cloud Composer。

在本参考架构文档中,生产数据处理工作流部署到与测试工作流相同的 Cloud Composer 环境中,以在一个合并视图中查看所有数据处理工作流。在此参考架构中,环境使用不同的 Cloud Storage 存储桶来保存输入和输出数据,从而进行分离。

为了完全分离环境,您需要在不同项目中创建多个 Cloud Composer 环境,这些环境默认情况下彼此分离。这种分离有助于保护您的生产环境。此方法不在本教程探讨范围之内。如需详细了解如何访问多个 Google Cloud 项目中的资源,请参阅设置服务账号权限

数据处理工作流

如需了解 Cloud Composer 如何运行数据处理工作流,请参阅用 Python 编写的 DAG。在 DAG 中,数据处理工作流的所有步骤会与它们之间的依赖关系一起定义。

CI/CD 流水线会在每个构建中自动将 DAG 定义从 Cloud Source Repositories 代码库部署到 Cloud Composer。此流程可确保 Cloud Composer 始终与最新的工作流定义保持同步,无需任何人工干预。

测试环境的 DAG 定义除了数据处理工作流之外,还定义了端到端测试步骤。测试步骤有助于确保数据处理工作流正确运行。

下图展示了数据处理工作流。

四步数据处理工作流。

数据处理工作流包括以下步骤:

  1. 在 Dataflow 中运行 WordCount 数据流程。
  2. 从 WordCount 流程中下载输出文件。WordCount 流程输出三个文件:

    • download_result_1
    • download_result_2
    • download_result_3
  3. 下载名为 download_ref_string 的参考文件。

  4. 根据参考文件验证结果。此集成测试汇总所有三份结果,并将汇总的结果与参考文件进行比较。

  5. 集成测试通过后,将消息发布到 Pub/Sub。

使用 Cloud Composer 等任务编排框架来管理数据处理工作流,这有助于缓解工作流的代码复杂性。

费用优化

在本文档中,您将使用 Google Cloud 的以下收费组件:

后续步骤