Elaborare i dati collettivamente con Dataflow
Questa pagina fornisce esempi di come utilizzare Dataflow per eseguire operazioni collettive di Firestore in una pipeline Apache Beam. Apache Beam supporta un connettore per Firestore. Puoi utilizzare questo collegatore per eseguire operazioni in batch e in streaming in Dataflow.
Consigliamo di utilizzare Dataflow e Apache Beam per i carichi di lavoro di elaborazione dei dati su larga scala.
Il connettore Firestore per Apache Beam è disponibile in Java. Per ulteriori informazioni sul connettore Firestore, consulta l'SDK Apache Beam per Java.
Prima di iniziare
Prima di leggere questa pagina, devi conoscere il modello di programmazione per Apache Beam.
Per eseguire i sample, devi attivare l'API Dataflow:
Pipeline di Firestore di esempio
Gli esempi riportati di seguito mostrano una pipeline che scrive i dati e un'altra che li legge e li filtra. Puoi utilizzare questi esempi come punto di partenza per le tue pipeline.
Esecuzione delle pipeline di esempio
Il codice sorgente degli esempi è disponibile nel repository GitHub googleapis/java-firestore. Per eseguire questi esempi, scarica il codice sorgente e consulta il file README.
Esempio di pipeline Write
L'esempio seguente crea documenti nella raccolta cities-beam-sample
:
public class ExampleFirestoreBeamWrite { private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance(); public static void main(String[] args) { runWrite(args, "cities-beam-sample"); } public static void runWrite(String[] args, String collectionId) { // create pipeline options from the passed in arguments PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); // create some writes Write write1 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC .setName(createDocumentName(collectionId, "NYC")) .putFields("name", Value.newBuilder().setStringValue("New York City").build()) .putFields("state", Value.newBuilder().setStringValue("New York").build()) .putFields("country", Value.newBuilder().setStringValue("USA").build())) .build(); Write write2 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK .setName(createDocumentName(collectionId, "TOK")) .putFields("name", Value.newBuilder().setStringValue("Tokyo").build()) .putFields("country", Value.newBuilder().setStringValue("Japan").build()) .putFields("capital", Value.newBuilder().setBooleanValue(true).build())) .build(); // batch write the data pipeline .apply(Create.of(write1, write2)) .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build()); // run the pipeline pipeline.run().waitUntilFinish(); } private static String createDocumentName(String collectionId, String cityDocId) { String documentPath = String.format( "projects/%s/databases/%s/documents", FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId()); return documentPath + "/" + collectionId + "/" + cityDocId; } }
L'esempio utilizza i seguenti argomenti per configurare ed eseguire una pipeline:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Esempio di pipeline Read
La pipeline di esempio seguente legge i documenti della raccolta cities-beam-sample
, applica un filtro per i documenti in cui il campo country
è impostato su
USA
e restituisce i nomi dei documenti corrispondenti.
public class ExampleFirestoreBeamRead { public static void main(String[] args) { runRead(args, "cities-beam-sample"); } public static void runRead(String[] args, String collectionId) { FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance(); PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); pipeline .apply(Create.of(collectionId)) .apply( new FilterDocumentsQuery( firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId())) .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build()) .apply( ParDo.of( // transform each document to its name new DoFn<RunQueryResponse, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(Objects.requireNonNull(c.element()).getDocument().getName()); } })) .apply( ParDo.of( // print the document name new DoFn<String, Void>() { @ProcessElement public void processElement(ProcessContext c) { System.out.println(c.element()); } })); pipeline.run().waitUntilFinish(); } private static final class FilterDocumentsQuery extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> { private final String projectId; private final String databaseId; public FilterDocumentsQuery(String projectId, String databaseId) { this.projectId = projectId; this.databaseId = databaseId; } @Override public PCollection<RunQueryRequest> expand(PCollection<String> input) { return input.apply( ParDo.of( new DoFn<String, RunQueryRequest>() { @ProcessElement public void processElement(ProcessContext c) { // select from collection "cities-collection-<uuid>" StructuredQuery.CollectionSelector collection = StructuredQuery.CollectionSelector.newBuilder() .setCollectionId(Objects.requireNonNull(c.element())) .build(); // filter where country is equal to USA StructuredQuery.Filter countryFilter = StructuredQuery.Filter.newBuilder() .setFieldFilter( StructuredQuery.FieldFilter.newBuilder() .setField( StructuredQuery.FieldReference.newBuilder() .setFieldPath("country") .build()) .setValue(Value.newBuilder().setStringValue("USA").build()) .setOp(StructuredQuery.FieldFilter.Operator.EQUAL)) .buildPartial(); RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder() .setParent(DocumentRootName.format(projectId, databaseId)) .setStructuredQuery( StructuredQuery.newBuilder() .addFrom(collection) .setWhere(countryFilter) .build()) .build(); c.output(runQueryRequest); } })); } } }
L'esempio utilizza i seguenti argomenti per configurare ed eseguire una pipeline:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Prezzi
L'esecuzione di un carico di lavoro Firestore in Dataflow comporta costi per l'utilizzo di Firestore e Dataflow. L'utilizzo di Dataflow viene fatturato in base alle risorse utilizzate dai tuoi job. Per informazioni dettagliate, consulta la pagina dei prezzi di Dataflow. Per i prezzi di Firestore, consulta la pagina dei prezzi.
Passaggi successivi
- Per un altro esempio di pipeline, consulta Utilizzare Firestore e Apache Beam per l'elaborazione dei dati.
- Per saperne di più su Dataflow e Apache Beam, consulta la documentazione di Dataflow.