Cloud Storage CSV files to BigQuery 流水线是一种批处理流水线,可让您从 Cloud Storage 中存储的 CSV 文件读取数据,并将结果附加到 BigQuery 表。CSV 文件可以未经压缩,也可以采用 Compression
枚举 SDK 页面中列出的格式进行压缩。
流水线要求
如需使用此模板,流水线必须满足以下要求。
BigQuery 架构 JSON 文件
创建一个用于描述 BigQuery 架构的 JSON 文件。
确保架构包含一个名为 BigQuery Schema
的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"}
格式。
Cloud Storage CSV files to BigQuery 批处理模板不支持将数据导入目标 BigQuery 表中的 STRUCT
(记录)字段。
下面的 JSON 描述了一个 BigQuery 架构示例:
{ "BigQuery Schema": [ { "name": "location", "type": "STRING" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "STRING" }, { "name": "color", "type": "STRING" }, { "name": "coffee", "type": "STRING" } ] }
错误表架构
存储 CSV 文件中被拒绝记录的 BigQuery 表必须与此处定义的表架构匹配。
{ "BigQuery Schema": [ { "name": "RawContent", "type": "STRING" }, { "name": "ErrorMsg", "type": "STRING" } ] }
模板参数
必需参数
- inputFilePattern:包含要处理的文本的 CSV 文件的 Cloud Storage 路径。例如
gs://your-bucket/path/*.csv
。 - schemaJSONPath:定义 BigQuery 架构的 JSON 文件的 Cloud Storage 路径。
- outputTable:存储已处理数据的 BigQuery 表的名称。如果您重复使用现有 BigQuery 表,则数据会被附加到目标表。
- bigQueryLoadingTemporaryDirectory:在 BigQuery 加载过程中使用的临时目录。例如
gs://your-bucket/your-files/temp_dir
。 - badRecordsOutputTable:处理 CSV 文件时用于存储被拒绝数据的 BigQuery 表的名称。如果您重复使用现有 BigQuery 表,则数据将被附加到目标表。 此表的架构必须与错误表架构匹配 (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema)。
- delimiter:CSV 文件使用的列分隔符。例如
,
。 - csvFormat:基于 Apache Commons CSV 格式的 CSV 格式。默认值为
Default
。
可选参数
- containsHeaders:CSV 文件中是否包含标题。默认值为
false
。 - csvFileEncoding:CSV 文件的字符编码格式。允许的值包括
US-ASCII
、ISO-8859-1
、UTF-8
和UTF-16
。默认为 UTF-8。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the CSV files on Cloud Storage to BigQuery (Batch) template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \ --region REGION_NAME \ --parameters \ inputFilePattern=PATH_TO_CSV_DATA,\ schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ outputTable=BIGQUERY_DESTINATION_TABLE,\ badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\ csvFormat=CSV_FORMAT,\ delimiter=DELIMITER,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\ containsHeaders=CONTAINS_HEADERS,\ csvFileEncoding=CSV_FILE_ENCODING
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
PATH_TO_CSV_DATA
:CSV 文件的 Cloud Storage 路径PATH_TO_BIGQUERY_SCHEMA_JSON
:包含架构定义的 JSON 文件的 Cloud Storage 路径BIGQUERY_DESTINATION_TABLE
:BigQuery 目标表的名称BIGQUERY_BAD_RECORDS_TABLE
:BigQuery 有误记录表的名称PATH_TO_TEMP_DIR_ON_GCS
:临时目录的 Cloud Storage 路径DELIMITER
:CSV 文件分隔符CSV_FORMAT
:用于解析记录的 CSV 格式规范CONTAINS_HEADERS
:CSV 文件是否包含标题CSV_FILE_ENCODING
:CSV 文件中的编码
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputFilePattern":"PATH_TO_CSV_DATA", "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "outputTable":"BIGQUERY_DESTINATION_TABLE", "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE", "csvFormat":"CSV_FORMAT", "delimiter":"DELIMITER", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS", "containsHeaders": "CONTAINS_HEADERS", "csvFileEncoding": "CSV_FILE_ENCODING" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
PATH_TO_CSV_DATA
:CSV 文件的 Cloud Storage 路径PATH_TO_BIGQUERY_SCHEMA_JSON
:包含架构定义的 JSON 文件的 Cloud Storage 路径BIGQUERY_DESTINATION_TABLE
:BigQuery 目标表的名称BIGQUERY_BAD_RECORDS_TABLE
:BigQuery 有误记录表的名称PATH_TO_TEMP_DIR_ON_GCS
:临时目录的 Cloud Storage 路径DELIMITER
:CSV 文件分隔符CSV_FORMAT
:用于解析记录的 CSV 格式规范CONTAINS_HEADERS
:CSV 文件是否包含标题CSV_FILE_ENCODING
:CSV 文件中的编码
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。