使用 Dataflow 批量处理数据
本页面举例说明了如何使用 Dataflow 在 Apache Beam 流水线中执行批量 Firestore 操作。Apache Beam 支持 Firestore 连接器。您可以使用此连接器在 Dataflow 中运行批量操作和流式操作。
我们建议将 Dataflow 和 Apache Beam 用于大规模的数据处理工作负载。
适用于 Apache Beam 的 Firestore 连接器支持 Java。如需详细了解 Firestore 连接器,请参阅 Java 版 Apache Beam SDK。
准备工作
在阅读本页内容之前,您应该先熟悉 Apache Beam 的编程模型。
如需运行示例,您必须启用 Dataflow API:
Firestore 流水线示例
下面的示例演示了一个用于写入数据的流水线和一个用于读取数据和过滤数据的流水线。您可以使用这些示例作为基础来创建自己的流水线。
运行示例流水线
这些示例的源代码可在“googleapis/java-firestore”GitHub 代码库中找到。如需运行这些示例,请下载相应源代码并查看自述文件。
Write
流水线示例
下面的示例会在 cities-beam-sample
集合中创建一些文档:
public class ExampleFirestoreBeamWrite { private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance(); public static void main(String[] args) { runWrite(args, "cities-beam-sample"); } public static void runWrite(String[] args, String collectionId) { // create pipeline options from the passed in arguments PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); // create some writes Write write1 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC .setName(createDocumentName(collectionId, "NYC")) .putFields("name", Value.newBuilder().setStringValue("New York City").build()) .putFields("state", Value.newBuilder().setStringValue("New York").build()) .putFields("country", Value.newBuilder().setStringValue("USA").build())) .build(); Write write2 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK .setName(createDocumentName(collectionId, "TOK")) .putFields("name", Value.newBuilder().setStringValue("Tokyo").build()) .putFields("country", Value.newBuilder().setStringValue("Japan").build()) .putFields("capital", Value.newBuilder().setBooleanValue(true).build())) .build(); // batch write the data pipeline .apply(Create.of(write1, write2)) .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build()); // run the pipeline pipeline.run().waitUntilFinish(); } private static String createDocumentName(String collectionId, String cityDocId) { String documentPath = String.format( "projects/%s/databases/%s/documents", FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId()); return documentPath + "/" + collectionId + "/" + cityDocId; } }
该示例使用以下参数来配置和运行流水线:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Read
流水线示例
下面的示例流水线会从 cities-beam-sample
集合中读取文档,然后应用一个过滤条件来查找 country
字段设为 USA
的文档,并返回匹配文档的名称。
public class ExampleFirestoreBeamRead { public static void main(String[] args) { runRead(args, "cities-beam-sample"); } public static void runRead(String[] args, String collectionId) { FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance(); PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); pipeline .apply(Create.of(collectionId)) .apply( new FilterDocumentsQuery( firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId())) .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build()) .apply( ParDo.of( // transform each document to its name new DoFn<RunQueryResponse, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(Objects.requireNonNull(c.element()).getDocument().getName()); } })) .apply( ParDo.of( // print the document name new DoFn<String, Void>() { @ProcessElement public void processElement(ProcessContext c) { System.out.println(c.element()); } })); pipeline.run().waitUntilFinish(); } private static final class FilterDocumentsQuery extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> { private final String projectId; private final String databaseId; public FilterDocumentsQuery(String projectId, String databaseId) { this.projectId = projectId; this.databaseId = databaseId; } @Override public PCollection<RunQueryRequest> expand(PCollection<String> input) { return input.apply( ParDo.of( new DoFn<String, RunQueryRequest>() { @ProcessElement public void processElement(ProcessContext c) { // select from collection "cities-collection-<uuid>" StructuredQuery.CollectionSelector collection = StructuredQuery.CollectionSelector.newBuilder() .setCollectionId(Objects.requireNonNull(c.element())) .build(); // filter where country is equal to USA StructuredQuery.Filter countryFilter = StructuredQuery.Filter.newBuilder() .setFieldFilter( StructuredQuery.FieldFilter.newBuilder() .setField( StructuredQuery.FieldReference.newBuilder() .setFieldPath("country") .build()) .setValue(Value.newBuilder().setStringValue("USA").build()) .setOp(StructuredQuery.FieldFilter.Operator.EQUAL)) .buildPartial(); RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder() .setParent(DocumentRootName.format(projectId, databaseId)) .setStructuredQuery( StructuredQuery.newBuilder() .addFrom(collection) .setWhere(countryFilter) .build()) .build(); c.output(runQueryRequest); } })); } } }
该示例使用以下参数来配置和运行流水线:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
价格
在 Dataflow 中运行 Firestore 工作负载会产生 Firestore 用量和 Dataflow 用量相关的费用。您需要为作业使用的资源支付 Dataflow 用量相关的费用。如需了解详情,请参阅 Dataflow 价格页面。如需了解 Firestore 价格,请参阅价格页面。
后续步骤
- 如需查看其他流水线示例,请参阅使用 Firestore 和 Apache Beam 进行数据处理。
- 如需详细了解 Dataflow 和 Apache Beam,请参阅 Dataflow 文档。