Menggunakan konektor Bigtable Spark
Konektor Bigtable Spark memungkinkan Anda membaca dan menulis data ke dan dari Bigtable. Anda dapat membaca data dari aplikasi Spark menggunakan Spark SQL dan DataFrame. Operasi Bigtable berikut didukung menggunakan konektor Bigtable Spark:
- Menulis data
- Membaca data
- Membuat tabel baru
Dokumen ini menunjukkan cara mengonversi tabel DataFrames Spark SQL menjadi tabel Bigtable, lalu mengompilasi dan membuat file JAR untuk mengirimkan tugas Spark.
Status dukungan Spark dan Scala
Konektor Bigtable Spark mendukung versi Scala berikut:
Konektor Bigtable Spark mendukung versi Spark berikut:
Konektor Bigtable Spark mendukung versi Dataproc berikut:
- Cluster versi gambar 1.5
- Cluster versi gambar 2.0
- 2.1 cluster image-version
- Cluster image-version 2.2
- Dataproc Serverless runtime versi 1.0
Menghitung biaya
Jika Anda memutuskan untuk menggunakan salah satu komponen Google Cloudyang dapat ditagih berikut, Anda akan ditagih untuk resource yang Anda gunakan:
- Bigtable (Anda tidak dikenai biaya untuk menggunakan emulator Bigtable)
- Dataproc
- Cloud Storage
Harga Dataproc berlaku untuk penggunaan cluster Dataproc di Compute Engine. Harga Dataproc Serverless berlaku untuk workload dan sesi yang dijalankan di Dataproc Serverless untuk Spark.
Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.
Sebelum memulai
Selesaikan prasyarat berikut sebelum menggunakan konektor Bigtable Spark.
Peran yang diperlukan
Untuk mendapatkan izin yang Anda perlukan untuk menggunakan konektor Bigtable Spark, minta administrator Anda untuk memberi Anda peran IAM berikut pada project Anda:
-
Bigtable Administrator (
roles/bigtable.admin
)(Opsional): memungkinkan Anda membaca atau menulis data dan membuat tabel baru. -
Bigtable User (
roles/bigtable.user
): memungkinkan Anda membaca atau menulis data, tetapi tidak memungkinkan Anda membuat tabel baru.
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.
Jika Anda menggunakan Dataproc atau Cloud Storage, izin tambahan mungkin diperlukan. Untuk mengetahui informasi selengkapnya, lihat izin Dataproc dan izin Cloud Storage.
Menyiapkan Spark
Selain membuat instance Bigtable, Anda juga perlu menyiapkan instance Spark. Anda dapat melakukannya secara lokal atau memilih salah satu opsi berikut untuk menggunakan Spark dengan Dataproc:
- Cluster Dataproc
- Dataproc Serverless
Untuk mengetahui informasi selengkapnya tentang memilih antara cluster Dataproc atau opsi serverless, lihat dokumentasi Dataproc Serverless untuk Spark dibandingkan dengan Dataproc di Compute Engine .
Download file JAR konektor
Anda dapat menemukan kode sumber konektor Bigtable Spark dengan contoh di repositori GitHub konektor Bigtable Spark.
Berdasarkan penyiapan Spark, Anda dapat mengakses file JAR sebagai berikut:
Jika menjalankan PySpark secara lokal, Anda harus mendownload file JAR konektor dari
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
lokasi Cloud Storage.Ganti
SCALA_VERSION
dengan2.12
atau2.13
, yang merupakan satu-satunya versi Scala yang didukung, dan gantiCONNECTOR_VERSION
dengan versi konektor yang ingin Anda gunakan.Untuk opsi cluster atau tanpa server Dataproc, gunakan file JAR terbaru sebagai artefak yang dapat ditambahkan di aplikasi Spark Scala atau Java Anda. Untuk mengetahui informasi selengkapnya tentang penggunaan file JAR sebagai artefak, lihat Mengelola dependensi.
Jika Anda mengirimkan tugas PySpark ke Dataproc, gunakan flag
gcloud dataproc jobs submit pyspark --jars
untuk menetapkan URI ke lokasi file JAR di Cloud Storage—misalnyags://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
.
Menentukan jenis komputasi
Untuk tugas hanya baca, Anda dapat menggunakan komputasi serverless Data Boost, yang memungkinkan Anda menghindari dampak pada cluster penayangan aplikasi. Aplikasi Spark Anda harus menggunakan konektor Spark versi 1.1.0 atau yang lebih baru untuk menggunakan Peningkatan Data.
Untuk menggunakan Data Boost, Anda harus membuat profil aplikasi Data Boost, lalu memberikan ID profil aplikasi untuk opsi Spark spark.bigtable.app_profile.id
saat menambahkan konfigurasi Bigtable ke aplikasi Spark. Jika Anda telah membuat profil aplikasi untuk tugas baca Spark dan ingin terus menggunakannya tanpa mengubah kode aplikasi, Anda dapat mengonversi profil aplikasi menjadi profil aplikasi Peningkatan Data. Untuk mengetahui informasi selengkapnya, lihat Mengonversi profil aplikasi.
Untuk mengetahui informasi selengkapnya, lihat Ringkasan Peningkatan Data Bigtable.
Untuk tugas yang melibatkan pembacaan dan penulisan, Anda dapat menggunakan node cluster instance untuk komputasi dengan menentukan profil aplikasi standar dengan permintaan Anda.
Mengidentifikasi atau membuat profil aplikasi yang akan digunakan
Jika Anda tidak menentukan ID profil aplikasi, konektor akan menggunakan profil aplikasi default.
Sebaiknya gunakan profil aplikasi unik untuk setiap aplikasi yang Anda jalankan, termasuk aplikasi Spark. Untuk mengetahui informasi selengkapnya tentang jenis dan setelan profil aplikasi, lihat Ringkasan profil aplikasi. Untuk mengetahui petunjuknya, lihat Membuat dan mengonfigurasi profil aplikasi.
Menambahkan konfigurasi Bigtable ke aplikasi Spark Anda
Di aplikasi Spark, tambahkan opsi Spark yang memungkinkan Anda berinteraksi dengan Bigtable.
Opsi Spark yang didukung
Gunakan opsi Spark yang tersedia sebagai bagian dari paket com.google.cloud.spark.bigtable
.
Nama opsi | Wajib | Nilai default | Arti |
---|---|---|---|
spark.bigtable.project.id |
Ya | T/A | Tetapkan project ID Bigtable. |
spark.bigtable.instance.id |
Ya | T/A | Tetapkan ID instance Bigtable. |
catalog |
Ya | T/A | Menetapkan format JSON yang menentukan format konversi antara skema mirip SQL DataFrame dan skema tabel Bigtable. Lihat Membuat metadata tabel dalam format JSON untuk mengetahui informasi selengkapnya. |
spark.bigtable.app_profile.id |
Tidak | default |
Tetapkan ID profil aplikasi Bigtable. |
spark.bigtable.write.timestamp.milliseconds |
Tidak | Waktu sistem saat ini | Menetapkan stempel waktu dalam milidetik yang akan digunakan saat menulis DataFrame ke Bigtable. Perhatikan bahwa karena semua baris dalam DataFrame menggunakan stempel waktu yang sama, baris dengan kolom kunci baris yang sama dalam DataFrame akan tetap ada sebagai satu versi di Bigtable karena memiliki stempel waktu yang sama. |
spark.bigtable.create.new.table |
Tidak | false |
Setel ke true untuk membuat tabel baru sebelum menulis ke Bigtable. |
spark.bigtable.read.timerange.start.milliseconds atau spark.bigtable.read.timerange.end.milliseconds |
Tidak | T/A | Tetapkan stempel waktu (dalam milidetik sejak waktu epoch) untuk memfilter sel dengan tanggal mulai dan akhir tertentu. |
spark.bigtable.push.down.row.key.filters |
Tidak | true |
Setel ke true untuk mengizinkan pemfilteran kunci baris sederhana di sisi server. Pemfilteran pada kunci baris gabungan diterapkan di sisi klien.Lihat Membaca baris DataFrame tertentu menggunakan filter untuk mengetahui informasi selengkapnya. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
Tidak | 30 mnt | Tetapkan durasi timeout untuk upaya membaca baris yang sesuai dengan satu partisi DataFrame di klien Bigtable untuk Java. |
spark.bigtable.read.rows.total.timeout.milliseconds |
Tidak | 12 j | Tetapkan durasi waktu tunggu total untuk upaya membaca baris yang sesuai dengan satu partisi DataFrame di klien Bigtable untuk Java. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
Tidak | 1m | Menetapkan durasi timeout untuk upaya mengubah baris yang sesuai dengan satu partisi DataFrame di klien Bigtable untuk Java. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
Tidak | 10 mnt | Menetapkan durasi total waktu tunggu untuk upaya mengubah baris yang sesuai dengan satu partisi DataFrame di klien Bigtable untuk Java. |
spark.bigtable.batch.mutate.size |
Tidak | 100 |
Disetel ke jumlah mutasi di setiap batch. Nilai maksimum yang dapat Anda tetapkan adalah 100000 . |
spark.bigtable.enable.batch_mutate.flow_control |
Tidak | false |
Setel ke true untuk mengaktifkan kontrol alur untuk mutasi batch. |
Membuat metadata tabel dalam format JSON
Format tabel DataFrame Spark SQL harus dikonversi menjadi tabel Bigtable menggunakan string dengan format JSON. Format JSON string ini membuat format data kompatibel dengan Bigtable. Anda dapat meneruskan format JSON dalam kode aplikasi menggunakan opsi .option("catalog", catalog_json_string)
.
Sebagai contoh, perhatikan tabel DataFrame berikut dan tabel Bigtable yang sesuai.
Dalam contoh ini, kolom name
dan birthYear
di DataFrame dikelompokkan bersama dalam grup kolom info
dan diganti namanya menjadi name
dan birth_year
. Demikian pula, kolom address
disimpan dalam grup kolom location
dengan nama kolom yang sama. Kolom id
dari DataFrame dikonversi menjadi kunci baris Bigtable.
Kunci baris tidak memiliki nama kolom khusus di Bigtable dan dalam contoh ini, id_rowkey
hanya digunakan untuk menunjukkan kepada konektor bahwa ini adalah kolom kunci baris. Anda dapat menggunakan nama apa pun untuk kolom kunci baris dan pastikan Anda menggunakan nama yang sama saat mendeklarasikan kolom "rowkey":"column_name"
dalam format JSON.
DataFrame | Tabel Bigtable = t1 | |||||||
Kolom | Kunci baris | Grup kolom | ||||||
info | lokasi | |||||||
Kolom | Kolom | |||||||
id | name | birthYear | address | id_rowkey | name | birth_year | address |
Format JSON untuk katalog adalah sebagai berikut:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
Kunci dan nilai yang digunakan dalam format JSON adalah sebagai berikut:
Kunci katalog | Nilai katalog | Format JSON |
---|---|---|
tabel | Nama tabel Bigtable. | "table":{"name":"t1"} Jika tabel tidak ada, gunakan .option("spark.bigtable.create.new.table", "true") untuk membuat tabel. |
rowkey | Nama kolom yang akan digunakan sebagai kunci baris Bigtable. Pastikan nama kolom DataFrame digunakan sebagai kunci baris—misalnya, id_rowkey . Kunci gabungan juga diterima sebagai kunci baris. Contohnya, "rowkey":"name:address" Pendekatan ini dapat menghasilkan row key yang memerlukan pemindaian tabel lengkap untuk semua permintaan baca. |
"rowkey":"id_rowkey" , |
kolom | Pemetaan setiap kolom DataFrame ke grup kolom Bigtable ("cf" ) dan nama kolom ("col" ) yang sesuai. Nama kolom dapat berbeda dari nama kolom dalam tabel DataFrame. Jenis data yang didukung mencakup string , long , dan binary . |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" Dalam contoh ini, id_rowkey adalah kunci baris, dan info serta location adalah grup kolom. |
Jenis data yang didukung
Konektor mendukung penggunaan jenis string
, long
, dan binary
(array byte)
dalam katalog. Hingga dukungan untuk jenis lain seperti int
dan float
ditambahkan, Anda dapat mengonversi jenis data tersebut secara manual menjadi array byte (BinaryType
Spark SQL) sebelum menggunakan konektor untuk menuliskannya ke Bigtable.
Selain itu, Anda dapat menggunakan Avro untuk menyerialisasi jenis
yang kompleks, seperti ArrayType
. Untuk mengetahui informasi selengkapnya, lihat Menyerialkan jenis data kompleks menggunakan Apache Avro.
Menulis ke Bigtable
Gunakan fungsi .write()
dan opsi yang didukung untuk menulis data Anda ke Bigtable.
Java
Kode berikut dari repositori GitHub menggunakan Java dan Maven untuk menulis ke Bigtable.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
Kode berikut dari repositori GitHub menggunakan Python untuk menulis ke Bigtable.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Membaca dari Bigtable
Gunakan fungsi .read()
untuk memeriksa apakah tabel berhasil diimpor ke Bigtable.
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
Kompilasikan project Anda
Buat file JAR yang digunakan untuk menjalankan tugas di cluster Dataproc, Dataproc Serverless, atau instance Spark lokal. Anda dapat mengompilasi file JAR secara lokal, lalu menggunakannya untuk mengirimkan tugas. Jalur ke JAR yang dikompilasi ditetapkan sebagai variabel lingkungan PATH_TO_COMPILED_JAR
saat Anda mengirimkan tugas.
Langkah ini tidak berlaku untuk aplikasi PySpark.
Mengelola dependensi
Konektor Bigtable Spark mendukung alat pengelolaan dependensi berikut:
Kompilasi file JAR
Maven
Tambahkan dependensi
spark-bigtable
ke file pom.xml Anda.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
Tambahkan plugin Maven Shade ke file
pom.xml
Anda untuk membuat JAR uber:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
Jalankan perintah
mvn clean install
untuk membuat file JAR.
sbt
Tambahkan dependensi
spark-bigtable
ke filebuild.sbt
Anda:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
Tambahkan plugin
sbt-assembly
ke fileproject/plugins.sbt
atauproject/assembly.sbt
Anda untuk membuat file Uber JAR.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Jalankan perintah
sbt clean assembly
untuk membuat file JAR.
Gradle
Tambahkan dependensi
spark-bigtable
ke filebuild.gradle
Anda.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
Tambahkan plugin Shadow di file
build.gradle
Anda untuk membuat file JAR uber:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
Lihat dokumentasi plugin Shadow untuk mengetahui informasi konfigurasi dan kompilasi JAR selengkapnya.
Mengirim tugas
Kirimkan tugas Spark menggunakan Dataproc, Dataproc Serverless, atau instance Spark lokal untuk meluncurkan aplikasi Anda.
Menetapkan lingkungan runtime
Tetapkan variabel lingkungan berikut.
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
Ganti kode berikut:
- PROJECT_ID: ID permanen untuk project Bigtable.
- INSTANCE_ID: ID permanen untuk instance Bigtable.
- TABLE_NAME: ID permanen untuk tabel.
- DATAPROC_CLUSTER: ID permanen untuk cluster Dataproc.
- DATAPROC_REGION: Region Dataproc yang berisi salah satu cluster di instance Dataproc Anda—misalnya,
northamerica-northeast2
. - DATAPROC_ZONE: Zona tempat cluster Dataproc berjalan.
- SUBNET: Jalur resource lengkap subnet.
- GCS_BUCKET_NAME: Bucket Cloud Storage untuk mengupload dependensi beban kerja Spark.
- PATH_TO_COMPILED_JAR: Jalur lengkap atau relatif ke JAR yang dikompilasi—misalnya,
/path/to/project/root/target/<compiled_JAR_name>
untuk Maven. - GCS_PATH_TO_CONNECTOR_JAR: Bucket Cloud Storage
gs://spark-lib/bigtable
, tempat filespark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
berada. - PATH_TO_PYTHON_FILE: Untuk aplikasi PySpark, jalur ke file Python yang akan digunakan untuk menulis data ke dan membaca data dari Bigtable.
- LOCAL_PATH_TO_CONNECTOR_JAR: Untuk aplikasi PySpark, jalur ke file JAR konektor Bigtable Spark yang didownload.
Mengirimkan tugas Spark
Untuk instance Dataproc atau penyiapan Spark lokal, jalankan tugas Spark untuk mengupload data ke Bigtable.
Cluster Dataproc
Gunakan file JAR yang dikompilasi dan buat tugas cluster Dataproc yang membaca dan menulis data dari dan ke Bigtable.
Buat cluster Dataproc. Contoh berikut menunjukkan contoh perintah untuk membuat cluster Dataproc v2.0 dengan Debian 10, dua worker node, dan konfigurasi default.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
Kirimkan tugas.
Scala/Java
Contoh berikut menunjukkan class
spark.bigtable.example.WordCount
yang mencakup logika untuk membuat tabel pengujian di DataFrame, menulis tabel ke Bigtable, lalu menghitung jumlah kata dalam tabel.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
Gunakan file JAR yang dikompilasi dan buat tugas Dataproc yang membaca dan menulis data dari dan ke Bigtable dengan instance Dataproc Serverless.
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Spark Lokal
Gunakan file JAR yang didownload dan buat tugas Spark yang membaca dan menulis data dari dan ke Bigtable dengan instance Spark lokal. Anda juga dapat menggunakan emulator Bigtable untuk mengirimkan tugas Spark.
Menggunakan emulator Bigtable
Jika Anda memutuskan untuk menggunakan emulator Bigtable, ikuti langkah-langkah berikut:
Jalankan perintah berikut untuk memulai emulator:
gcloud beta emulators bigtable start
Secara default, emulator memilih
localhost:8086
.Tetapkan variabel lingkungan
BIGTABLE_EMULATOR_HOST
:export BIGTABLE_EMULATOR_HOST=localhost:8086
Untuk mengetahui informasi selengkapnya tentang cara menggunakan emulator Bigtable, lihat Menguji menggunakan emulator.
Mengirimkan tugas Spark
Gunakan perintah spark-submit
untuk mengirimkan tugas Spark, terlepas dari apakah Anda menggunakan emulator Bigtable lokal atau tidak.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Memverifikasi data tabel
Jalankan perintah
cbt
CLI
berikut untuk memverifikasi bahwa data ditulis ke Bigtable. CLI
cbt
adalah komponen Google Cloud CLI. Untuk mengetahui informasi selengkapnya, lihat
ringkasan
cbt
CLI.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
Solusi tambahan
Gunakan konektor Bigtable Spark untuk solusi tertentu, seperti menyerialkan jenis Spark SQL yang kompleks, membaca baris tertentu, dan membuat metrik sisi klien.
Membaca baris DataFrame tertentu menggunakan filter
Saat menggunakan DataFrame untuk membaca dari Bigtable, Anda dapat menentukan filter untuk hanya membaca baris tertentu. Filter sederhana seperti ==
, <=
, dan startsWith
pada kolom kunci baris diterapkan di sisi server untuk menghindari pemindaian seluruh tabel. Filter pada kunci baris gabungan atau filter kompleks seperti filter LIKE
pada kolom kunci baris diterapkan di sisi klien.
Jika Anda membaca tabel besar, sebaiknya gunakan filter kunci baris sederhana untuk menghindari pemindaian seluruh tabel. Contoh pernyataan berikut menunjukkan cara membaca menggunakan filter sederhana. Pastikan bahwa di filter Spark, Anda menggunakan nama kolom DataFrame yang dikonversi menjadi kunci baris:
dataframe.filter("id == 'some_id'").show()
Saat menerapkan filter, gunakan nama kolom DataFrame, bukan nama kolom tabel Bigtable.
Melakukan serialisasi jenis data kompleks menggunakan Apache Avro
Konektor Bigtable Spark menyediakan dukungan untuk menggunakan Apache Avro guna melakukan serialisasi jenis SQL Spark yang kompleks, seperti ArrayType
, MapType
, atau StructType
. Apache Avro menyediakan serialisasi data untuk data rekaman yang umumnya digunakan untuk memproses dan menyimpan struktur data yang kompleks.
Gunakan sintaksis seperti "avro":"avroSchema"
untuk menentukan bahwa kolom di Bigtable harus dienkode menggunakan Avro. Kemudian, Anda dapat menggunakan .option("avroSchema", avroSchemaString)
saat membaca dari atau menulis ke Bigtable untuk menentukan skema Avro yang sesuai dengan kolom tersebut dalam format string. Anda dapat menggunakan nama opsi yang berbeda—misalnya, "anotherAvroSchema"
untuk kolom yang berbeda dan meneruskan skema Avro untuk beberapa kolom.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
Menggunakan metrik sisi klien
Karena konektor Bigtable Spark didasarkan pada Bigtable Client for Java, metrik sisi klien diaktifkan di dalam konektor secara default. Anda dapat membaca dokumentasi metrik sisi klien untuk menemukan detail selengkapnya tentang cara mengakses dan menafsirkan metrik ini.
Menggunakan klien Bigtable untuk Java dengan fungsi RDD tingkat rendah
Karena konektor Bigtable Spark didasarkan pada klien Bigtable untuk Java, Anda dapat langsung menggunakan klien di aplikasi Spark dan melakukan permintaan baca atau tulis terdistribusi dalam fungsi RDD tingkat rendah seperti mapPartitions
dan foreachPartition
.
Untuk menggunakan klien Bigtable untuk class Java, tambahkan awalan com.google.cloud.spark.bigtable.repackaged
ke nama paket. Misalnya, alih-alih menggunakan nama class sebagai com.google.cloud.bigtable.data.v2.BigtableDataClient
, gunakan com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
.
Untuk mengetahui informasi selengkapnya tentang klien Bigtable untuk Java, lihat klien Bigtable untuk Java.
Langkah berikutnya
- Pelajari cara mengoptimalkan tugas Spark di Dataproc.
- Gunakan class dari klien Bigtable untuk Java dengan konektor Bigtable Spark.