Der Hadoop BigQuery-Connector wird standardmäßig auf allen Dataproc 1.0-1.2-Clusterknoten unter /usr/lib/hadoop/lib/
installiert.
Er ist sowohl in Spark- als auch in PySpark-Umgebungen verfügbar.
Dataproc-Image-Versionen 1.5 und höher: Der BigQuery-Connector wird nicht standardmäßig in den Dataproc -Image-Versionen 1.5 und höher installiert. So verwenden Sie den Connector mit diesen Versionen:
Installieren Sie den BigQuery-Connector mithilfe der Initialisierungsaktion.
Geben Sie beim Senden eines Jobs den BigQuery-Connector im
jars
-Parameter an:--jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar
Fügen Sie die BigQuery-Connector-Klassen zur jar-with-dependencies der Anwendung hinzu.
Konflikte vermeiden: Wenn Ihre Anwendung eine andere Connector-Version als die in Ihrem Dataproc-Cluster bereitgestellte Connector-Version verwendet, müssen Sie entweder:
Erstellen Sie einen neuen Cluster unter Verwendung einer Initialisierungsaktion, mit der die von Ihrer Anwendung verwendete Connector-Version installiert wird.
Fügen Sie die Connector-Klassen und Connector-Abhängigkeiten für die von Ihnen verwendete Version in die JAR-Datei Ihrer Anwendung ein und verschieben Sie sie, um Konflikte zwischen Ihrer Connector-Version und der in Ihrem Dataproc-Cluster bereitgestellten Connector-Version zu vermeiden. Weitere Informationen finden Sie in diesem Beispiel für die Verschiebung von Abhängigkeiten in Maven.
GsonBigQueryInputFormat-Klasse
GsonBigQueryInputFormat
stellt Hadoop BigQuery-Objekte in einem JsonObject-Format über die folgenden primären Vorgänge bereit:
- Verwenden einer benutzerdefinierten Abfrage zum Auswählen von BigQuery-Objekten.
- Gleichmäßiges Aufteilen der Ergebnisse der Abfrage auf Hadoop-Knoten.
- Parsen der Splitergebnisse in Java-Objekte zur Übergabe an den Mapper.
Die Hadoop-Mapper-Klasse erhält eine
JsonObject
-Darstellung für jedes ausgewählte BigQuery-Objekt.
Die BigQueryInputFormat
-Klasse bietet Zugriff auf BigQuery-Datensätze über eine Erweiterung der Hadoop InputFormat-Klasse. So verwenden Sie die BigQueryInputFormat-Klasse:
Zeilen müssen dem Hauptjob von Hadoop hinzugefügt werden, um Parameter in der Hadoop-Konfiguration festzulegen.
Die InputFormat-Klasse muss auf
GsonBigQueryInputFormat
festgelegt werden.
In den folgenden Abschnitten erfahren Sie, wie Sie diese Anforderungen erfüllen.
Eingabeparameter
- QualifiedInputTableId
- Die BigQuery-Tabelle, aus der gelesen werden soll, im Format:
optional-projectId:datasetId.tableId
Beispiel:publicdata:samples.shakespeare
- projectId
- Die Bezeichnung des BigQuery-Projekts, unter dem alle Eingabevorgänge stattfinden.
Beispiel:my-first-cloud-project
// Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input parameters. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Set InputFormat. job.setInputFormatClass(GsonBigQueryInputFormat.class);
Hinweise:
job
bezieht sich auforg.apache.hadoop.mapreduce.Job
, den auszuführenden Hadoop-Job.conf
bezieht sich auforg.apache.hadoop.Configuration
für den Hadoop-Job.
Mapper
Die GsonBigQueryInputFormat
-Klasse liest aus BigQuery und gibt BigQuery-Objekte nacheinander als Eingabe an die Hadoop-Funktion Mapper
weiter. Die Eingaben haben die Form eines Paars, das Folgendes enthält:
LongWritable
, die DatensatznummerJsonObject
, der BigQuery-Datensatz im JSON-Format
Die Mapper
akzeptiert LongWritable
und JsonObject pair
als Eingabe.
Hier ist ein Snippet aus dem Mapper
für einen Beispiel-WordCount-Job.
// private static final LongWritable ONE = new LongWritable(1); // The configuration key used to specify the BigQuery field name // ("column name"). public static final String WORDCOUNT_WORD_FIELDNAME_KEY = "mapred.bq.samples.wordcount.word.key"; // Default value for the configuration entry specified by // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in // publicdata:samples.shakespeare or 'repository_name' // in publicdata:samples.github_timeline. public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word"; /** * The mapper function for WordCount. */ public static class Map extends Mapper <LongWritable, JsonObject, Text, LongWritable> { private static final LongWritable ONE = new LongWritable(1); private Text word = new Text(); private String wordKey; @Override public void setup(Context context) throws IOException, InterruptedException { // Find the runtime-configured key for the field name we're looking for // in the map task. Configuration conf = context.getConfiguration(); wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT); } @Override public void map(LongWritable key, JsonObject value, Context context) throws IOException, InterruptedException { JsonElement countElement = value.get(wordKey); if (countElement != null) { String wordInRecord = countElement.getAsString(); word.set(wordInRecord); // Write out the key, value pair (write out a value of 1, which will be // added to the total count for this word in the Reducer). context.write(word, ONE); } } }
IndirectBigQueryOutputFormat-Klasse
IndirectBigQueryOutputFormat
bietet Hadoop die Möglichkeit, JsonObject
-Werte direkt in eine BigQuery-Tabelle zu schreiben. Diese Klasse bietet über eine Erweiterung der Hadoop-Klasse OutputFormat Zugriff auf BigQuery-Datensätze. Zur korrekten Verwendung müssen mehrere Parameter in der Hadoop-Konfiguration festgelegt werden und die OutputFormat-Klasse muss auf IndirectBigQueryOutputFormat
festgelegt sein. Im Folgenden finden Sie ein Beispiel für die notwendigen Parameter und Codezeilen zur korrekten Verwendung von IndirectBigQueryOutputFormat
.
Ausgabeparameter
- projectId
- Die Bezeichnung des BigQuery-Projekts, unter dem alle Ausgabevorgänge stattfinden.
Beispiel: "my-first-cloud-project" - QualifiedOutputTableId
- Das BigQuery-Dataset, in das die endgültigen Jobergebnisse geschrieben werden sollen, im Format optional-projectId:datasetId.tableId. Die Dataset-ID sollte im Projekt bereits vorhanden sein.
Das Dataset outputDatasetId_hadoop_temporary wird in BigQuery für die Aufnahme temporärer Ergebnisse erstellt. Überprüfen Sie, dass es nicht mit vorhandenen Datasets in Konflikt steht.
Beispiele:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output
- outputTableFieldSchema
- Ein Schema, das das Schema für die BigQuery-Ausgabetabelle definiert
- GcsOutputPath
- Der Ausgabepfad zum Speichern temporärer Cloud Storage-Daten (
gs://bucket/dir/
)
// Define the schema we will be using for the output BigQuery table. List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>(); outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING")); outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER")); TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema); // Create the job and get its configuration. Job job = new Job(parser.getConfiguration(), "wordcount"); Configuration conf = job.getConfiguration(); // Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Configure output. BigQueryOutputConfiguration.configure( conf, outputQualifiedTableId, outputSchema, outputGcsPath, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class); // (Optional) Configure the KMS key used to encrypt the output table. BigQueryOutputConfiguration.setKmsKeyName( conf, "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1"); );
Reducer
Die IndirectBigQueryOutputFormat
-Klasse schreibt in BigQuery.
Sie erhält einen Schlüssel und einen JsonObject
-Wert als Eingabe, schreibt aber nur den JsonObject-Wert in BigQuery (der Schlüssel wird ignoriert). Das JsonObject
sollte einen BigQuery-Datensatz im Json-Format enthalten. Der Reducer sollte ein Paar aus einem Schlüssel eines beliebigen Typs (in unserem Beispiel-WordCount-Job wird NullWritable
verwendet) und einem JsonObject
-Wert ausgeben. Der Reducer für den Beispiel-WordCount-Job wird unten angezeigt.
/** * Reducer function for WordCount. */ public static class Reduce extends Reducer<Text, LongWritable, JsonObject, NullWritable> { @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // Add up the values to get a total number of occurrences of our word. long count = 0; for (LongWritable val : values) { count = count + val.get(); } JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("Word", key.toString()); jsonObject.addProperty("Count", count); // Key does not matter. context.write(jsonObject, NullWritable.get()); } }
Bereinigen
Bereinigen Sie nach Abschluss des Jobs die Cloud Storage-Exportpfade.
job.waitForCompletion(true); GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
Sie können die Wordcounts in der BigQuery-Ausgabetabelle der Google Cloud Console einsehen.
Vollständiger Code für einen Beispiel-WordCount-Job
Der folgende Code ist ein Beispiel für einen einfachen WordCount-Job, mit dem Wortzahlen aus Objekten in BigQuery aggregiert werden.
package com.google.cloud.hadoop.io.bigquery.samples;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Sample program to run the Hadoop Wordcount example over tables in BigQuery.
*/
public class WordCount {
// The configuration key used to specify the BigQuery field name
// ("column name").
public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
"mapred.bq.samples.wordcount.word.key";
// Default value for the configuration entry specified by
// WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
// publicdata:samples.shakespeare or 'repository_name'
// in publicdata:samples.github_timeline.
public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";
// Guava might not be available, so define a null / empty helper:
private static boolean isStringNullOrEmpty(String toTest) {
return toTest == null || "".equals(toTest);
}
/**
* The mapper function for WordCount. For input, it consumes a LongWritable
* and JsonObject as the key and value. These correspond to a row identifier
* and Json representation of the row's values/columns.
* For output, it produces Text and a LongWritable as the key and value.
* These correspond to the word and a count for the number of times it has
* occurred.
*/
public static class Map
extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
private static final LongWritable ONE = new LongWritable(1);
private Text word = new Text();
private String wordKey;
@Override
public void setup(Context context)
throws IOException, InterruptedException {
// Find the runtime-configured key for the field name we're looking for in
// the map task.
Configuration conf = context.getConfiguration();
wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
}
@Override
public void map(LongWritable key, JsonObject value, Context context)
throws IOException, InterruptedException {
JsonElement countElement = value.get(wordKey);
if (countElement != null) {
String wordInRecord = countElement.getAsString();
word.set(wordInRecord);
// Write out the key, value pair (write out a value of 1, which will be
// added to the total count for this word in the Reducer).
context.write(word, ONE);
}
}
}
/**
* Reducer function for WordCount. For input, it consumes the Text and
* LongWritable that the mapper produced. For output, it produces a JsonObject
* and NullWritable. The JsonObject represents the data that will be
* loaded into BigQuery.
*/
public static class Reduce
extends Reducer<Text, LongWritable, JsonObject, NullWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
// Add up the values to get a total number of occurrences of our word.
long count = 0;
for (LongWritable val : values) {
count = count + val.get();
}
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("Word", key.toString());
jsonObject.addProperty("Count", count);
// Key does not matter.
context.write(jsonObject, NullWritable.get());
}
}
/**
* Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
* [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
* [QualifiedOutputTableId] [GcsOutputPath]
*
* ProjectId - Project under which to issue the BigQuery
* operations. Also serves as the default project for table IDs that don't
* specify a project for the table.
*
* QualifiedInputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* InputTableFieldName - Name of the field to count in the
* input table, e.g., 'word' in publicdata:samples.shakespeare or
* 'repository_name' in publicdata:samples.github_timeline.
*
* QualifiedOutputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* GcsOutputPath - The output path to store temporary
* Cloud Storage data, e.g., gs://bucket/dir/
*
* @param args a String[] containing ProjectId, QualifiedInputTableId,
* InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
* @throws IOException on IO Error.
* @throws InterruptedException on Interrupt.
* @throws ClassNotFoundException if not all classes are present.
*/
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
// GenericOptionsParser is a utility to parse command line arguments
// generic to the Hadoop framework. This example doesn't cover the specifics,
// but recognizes several standard command line arguments, enabling
// applications to easily specify a NameNode, a ResourceManager, additional
// configuration resources, etc.
GenericOptionsParser parser = new GenericOptionsParser(args);
args = parser.getRemainingArgs();
// Make sure we have the right parameters.
if (args.length != 5) {
System.out.println(
"Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
+ "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
+ " ProjectId - Project under which to issue the BigQuery operations. Also serves "
+ "as the default project for table IDs that don't explicitly specify a project for "
+ "the table.\n"
+ " QualifiedInputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " InputTableFieldName - Name of the field to count in the input table, e.g., "
+ "'word' in publicdata:samples.shakespeare or 'repository_name' in "
+ "publicdata:samples.github_timeline.\n"
+ " QualifiedOutputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
+ "gs://bucket/dir/");
System.exit(1);
}
// Get the individual parameters from the command line.
String projectId = args[0];
String inputQualifiedTableId = args[1];
String inputTableFieldId = args[2];
String outputQualifiedTableId = args[3];
String outputGcsPath = args[4];
// Define the schema we will be using for the output BigQuery table.
List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);
// Create the job and get its configuration.
Job job = new Job(parser.getConfiguration(), "wordcount");
Configuration conf = job.getConfiguration();
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Configure input.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);
// Configure output.
BigQueryOutputConfiguration.configure(
conf,
outputQualifiedTableId,
outputSchema,
outputGcsPath,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
TextOutputFormat.class);
// (Optional) Configure the KMS key used to encrypt the output table.
BigQueryOutputConfiguration.setKmsKeyName(
conf,
"projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);
// This helps Hadoop identify the Jar which contains the mapper and reducer
// by specifying a class in that Jar. This is required if the jar is being
// passed on the command line to Hadoop.
job.setJarByClass(WordCount.class);
// Tell the job what data the mapper will output.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(GsonBigQueryInputFormat.class);
// Instead of using BigQueryOutputFormat, we use the newer
// IndirectBigQueryOutputFormat, which works by first buffering all the data
// into a Cloud Storage temporary file, and then on commitJob, copies all data from
// Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
// since it only requires one BigQuery "load" job per Hadoop/Spark job, as
// compared to BigQueryOutputFormat, which performs one BigQuery job for each
// Hadoop/Spark task.
job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);
job.waitForCompletion(true);
// After the job completes, clean up the Cloud Storage export paths.
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
// You can view word counts in the BigQuery output table at
// https://console.cloud.google.com/.
}
}
Java-Version
Der BigQuery-Connector erfordert Java 8.
Informationen zu Apache Maven Dependency
<dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>bigquery-connector</artifactId> <version>insert "hadoopX-X.X.X" connector version number here</version> </dependency>
Weitere Informationen finden Sie in den Versionshinweisen zu BigQuery-Connector und in der Javadoc-Referenz.