Template aliran perubahan Bigtable ke Pub/Sub adalah pipeline streaming yang mengalirkan rekaman perubahan data Bigtable dan memublikasikannya ke topik Pub/Sub menggunakan Dataflow.
Dengan aliran perubahan Bigtable, Anda dapat berlangganan mutasi data per tabel. Saat Anda berlangganan aliran perubahan tabel, batasan berikut berlaku:
- Hanya sel yang diubah dan deskriptor operasi penghapusan yang ditampilkan.
- Hanya nilai baru sel yang diubah yang ditampilkan.
Saat data perubahan dipublikasikan ke topik Pub/Sub, pesan mungkin disisipkan secara tidak berurutan dibandingkan dengan urutan stempel waktu commit Bigtable asli.
Catatan perubahan data Bigtable yang tidak dapat dipublikasikan ke topik Pub/Sub untuk sementara ditempatkan di direktori antrean pesan yang dihentikan pengirimannya (antrean pesan yang tidak diproses) di Cloud Storage. Setelah jumlah maksimum percobaan ulang yang tidak berhasil, kumpulan data ini akan ditempatkan tanpa batas waktu di direktori antrean pesan yang tidak terkirim yang sama untuk peninjauan manual atau pemrosesan lebih lanjut oleh pengguna.
Pipeline mengharuskan topik Pub/Sub tujuan ada. Topik tujuan dapat dikonfigurasi untuk memvalidasi pesan menggunakan skema. Jika topik Pub/Sub menentukan skema, pipeline hanya dimulai jika skema valid. Bergantung pada jenis skema, gunakan salah satu definisi skema berikut untuk topik tujuan:
Buffering protokol
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Gunakan skema Protobuf berikut dengan encoding pesan JSON
:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Setiap pesan Pub/Sub baru menyertakan satu entri dari rekaman perubahan data yang ditampilkan oleh aliran perubahan dari baris yang sesuai dalam tabel Bigtable Anda. Template Pub/Sub meratakan entri di setiap catatan perubahan data menjadi perubahan tingkat sel individual.
Deskripsi pesan output Pub/Sub
Nama kolom | Deskripsi |
---|---|
rowKey |
Kunci baris dari baris yang diubah. Tiba dalam bentuk array byte. Jika encoding pesan JSON dikonfigurasi, kunci baris akan ditampilkan sebagai string. Jika useBase64Rowkeys ditentukan, kunci baris dienkode Base64. Jika tidak, charset yang ditentukan oleh bigtableChangeStreamCharset akan digunakan untuk mendekode byte kunci baris menjadi string. |
modType |
Jenis mutasi baris. Gunakan salah satu nilai berikut: SET_CELL , DELETE_CELLS , atau DELETE_FAMILY . |
columnFamily |
Grup kolom yang terpengaruh oleh mutasi baris. |
column |
Penentu kolom yang terpengaruh oleh mutasi baris. Untuk jenis mutasi DELETE_FAMILY , kolom tidak ditetapkan. Tiba dalam bentuk array byte. Jika encoding pesan JSON dikonfigurasi, kolom akan ditampilkan sebagai string. Jika useBase64ColumnQualifier ditentukan, kolom akan dienkode Base64. Jika tidak, charset yang ditentukan oleh bigtableChangeStreamCharset akan digunakan untuk mendekode byte kunci baris menjadi string. |
commitTimestamp |
Waktu saat Bigtable menerapkan mutasi. Waktu diukur dalam mikrodetik sejak epoch Unix (1 Januari 1970 pada UTC). |
timestamp |
Nilai stempel waktu sel yang terpengaruh oleh mutasi. Untuk jenis mutasi DELETE_CELLS dan DELETE_FAMILY , stempel waktu tidak ditetapkan. Waktu diukur dalam mikrodetik sejak epoch Unix (1 Januari 1970 pada UTC). |
timestampFrom |
Menjelaskan awal inklusif interval stempel waktu untuk semua sel yang dihapus oleh mutasi DELETE_CELLS . Untuk jenis mutasi lainnya, timestampFrom tidak ditetapkan. Waktu diukur dalam mikrodetik sejak epoch Unix (1 Januari 1970 pada UTC). |
timestampTo |
Menjelaskan akhir eksklusif interval stempel waktu untuk semua sel yang dihapus oleh mutasi DELETE_CELLS . Untuk jenis mutasi lainnya, timestampTo tidak ditetapkan. |
isGC |
Nilai boolean yang menunjukkan apakah mutasi dihasilkan oleh mekanisme pengumpulan sampah Bigtable. |
tieBreaker |
Jika dua mutasi didaftarkan secara bersamaan oleh cluster Bigtable yang berbeda, mutasi dengan nilai tiebreaker tertinggi akan diterapkan ke tabel sumber. Mutasi dengan nilai tiebreaker yang lebih rendah akan dihapus. |
value |
Nilai baru yang ditetapkan oleh mutasi. Kecuali jika opsi pipeline stripValues ditetapkan, nilai akan ditetapkan untuk mutasi SET_CELL . Untuk jenis mutasi lainnya, nilai tidak ditetapkan. Tiba dalam bentuk array byte. Jika encoding pesan JSON dikonfigurasi, nilai akan ditampilkan sebagai string.
Jika useBase64Values ditentukan, nilainya dienkode Base64. Jika tidak, charset yang ditentukan oleh bigtableChangeStreamCharset akan digunakan untuk mendekode byte nilai menjadi string. |
sourceInstance |
Nama instance Bigtable yang mendaftarkan mutasi. Mungkin terjadi saat beberapa pipeline melakukan streaming perubahan dari instance yang berbeda ke topik Pub/Sub yang sama. |
sourceCluster |
Nama cluster Bigtable yang mendaftarkan mutasi. Dapat digunakan saat beberapa pipeline melakukan streaming perubahan dari berbagai instance ke topik Pub/Sub yang sama. |
sourceTable |
Nama tabel Bigtable yang menerima mutasi. Dapat digunakan jika beberapa pipeline melakukan streaming perubahan dari tabel yang berbeda ke topik Pub/Sub yang sama. |
Persyaratan pipeline
- Instance sumber Bigtable yang ditentukan.
- Tabel sumber Bigtable yang ditentukan. Tabel harus mengaktifkan aliran perubahan.
- Profil aplikasi Bigtable yang ditentukan.
- Topik Pub/Sub yang ditentukan harus ada.
Parameter template
Parameter yang diperlukan
- pubSubTopic: Nama topik Pub/Sub tujuan.
- bigtableChangeStreamAppProfile: ID profil aplikasi Bigtable. Profil aplikasi harus menggunakan perutean cluster tunggal dan mengizinkan transaksi baris tunggal.
- bigtableReadInstanceId: ID instance Bigtable sumber.
- bigtableReadTableId: ID tabel Bigtable sumber.
Parameter opsional
- messageEncoding: Encoding pesan yang akan dipublikasikan ke topik Pub/Sub. Saat skema topik tujuan dikonfigurasi, encoding pesan ditentukan oleh setelan topik. Nilai berikut didukung:
BINARY
danJSON
. Setelan defaultnya adalahJSON
. - messageFormat: Encoding pesan yang akan dipublikasikan ke topik Pub/Sub. Saat skema topik tujuan dikonfigurasi, encoding pesan ditentukan oleh setelan topik. Nilai berikut didukung:
AVRO
,PROTOCOL_BUFFERS
, danJSON
. Nilai defaultnya adalahJSON
. Jika formatJSON
digunakan, kolom rowKey, column, dan value dari pesan adalah string, yang isinya ditentukan oleh opsi pipelineuseBase64Rowkeys
,useBase64ColumnQualifiers
,useBase64Values
, danbigtableChangeStreamCharset
. - stripValues: Jika ditetapkan ke
true
, mutasiSET_CELL
akan ditampilkan tanpa nilai baru yang ditetapkan. Setelan defaultnya adalahfalse
. Parameter ini berguna saat Anda tidak memerlukan nilai baru, yang juga dikenal sebagai invalidasi cache, atau saat nilai sangat besar dan melampaui batas ukuran pesan Pub/Sub. - dlqDirectory: Direktori untuk antrean pesan yang tidak terkirim. Data yang gagal diproses disimpan di direktori ini. Secara default, direktori berada di bawah lokasi sementara tugas Dataflow. Dalam sebagian besar kasus, Anda dapat menggunakan jalur default.
- dlqRetryMinutes: Jumlah menit antara percobaan ulang antrean pesan yang tidak terkirim. Nilai defaultnya adalah
10
. - dlqMaxRetries: Jumlah maksimum percobaan ulang pesan yang pengirimannya dihentikan. Nilai defaultnya adalah
5
. - useBase64Rowkeys: Digunakan dengan encoding pesan JSON. Jika disetel ke
true
, kolomrowKey
adalah string berenkode Base64. Jika tidak,rowKey
dihasilkan dengan menggunakanbigtableChangeStreamCharset
untuk mendekode byte menjadi string. Nilai default-nya adalahfalse
. - pubSubProjectId: Project ID Bigtable. Defaultnya adalah project tugas Dataflow.
- useBase64ColumnQualifiers: Digunakan dengan encoding pesan JSON. Jika disetel ke
true
, kolomcolumn
adalah string berenkode Base64. Jika tidak, kolom dihasilkan dengan menggunakanbigtableChangeStreamCharset
untuk mendekode byte menjadi string. Setelan defaultnya adalahfalse
. - useBase64Values: Digunakan dengan encoding pesan JSON. Jika disetel ke
true
, kolom nilai adalah string berenkode Base64. Jika tidak, nilai dihasilkan dengan menggunakanbigtableChangeStreamCharset
untuk mendekode byte menjadi string. Setelan defaultnya adalahfalse
. - disableDlqRetries: Apakah akan menonaktifkan percobaan ulang untuk DLQ atau tidak. Nilai defaultnya adalah: false.
- bigtableChangeStreamMetadataInstanceId: ID instance metadata aliran perubahan Bigtable. Nilai defaultnya adalah kosong.
- bigtableChangeStreamMetadataTableTableId: ID tabel metadata konektor aliran perubahan data Bigtable. Jika tidak disediakan, tabel metadata konektor aliran perubahan Bigtable akan otomatis dibuat selama eksekusi pipeline. Nilai defaultnya adalah kosong.
- bigtableChangeStreamCharset: Nama charset aliran perubahan Bigtable. Nilai defaultnya adalah: UTF-8.
- bigtableChangeStreamStartTimestamp: Stempel waktu awal (https://tools.ietf.org/html/rfc3339), inklusif, yang akan digunakan untuk membaca aliran perubahan. Misalnya,
2022-05-05T07:59:59Z
. Default-nya adalah stempel waktu waktu mulai pipeline. - bigtableChangeStreamIgnoreColumnFamilies: Daftar perubahan nama grup kolom yang dipisahkan koma untuk diabaikan. Nilai defaultnya adalah kosong.
- bigtableChangeStreamIgnoreColumns: Daftar perubahan nama kolom yang dipisahkan koma untuk diabaikan. Contoh: "cf1:col1,cf2:col2". Nilai defaultnya adalah kosong.
- bigtableChangeStreamName: Nama unik untuk pipeline klien. Memungkinkan Anda melanjutkan pemrosesan dari titik saat pipeline yang sebelumnya berjalan berhenti. Secara default, nama dibuat secara otomatis. Lihat log tugas Dataflow untuk nilai yang digunakan.
- bigtableChangeStreamResume: Jika disetel ke
true
, pipeline baru akan melanjutkan pemrosesan dari titik saat pipeline yang sebelumnya berjalan dengan nilaibigtableChangeStreamName
yang sama berhenti. Jika pipeline dengan nilaibigtableChangeStreamName
yang diberikan belum pernah dijalankan, pipeline baru tidak akan dimulai. Jika disetel kefalse
, pipeline baru akan dimulai. Jika pipeline dengan nilaibigtableChangeStreamName
yang sama telah berjalan untuk sumber tertentu, pipeline baru tidak akan dimulai. Nilai defaultnya adalahfalse
. - bigtableReadChangeStreamTimeoutMs: Waktu tunggu untuk permintaan Bigtable ReadChangeStream dalam milidetik.
- bigtableReadProjectId: Project ID Bigtable. Defaultnya adalah project untuk tugas Dataflow.
Menjalankan template
Konsol
- Buka halaman Dataflow Create job from template. Buka Membuat tugas dari template
- Di kolom Nama tugas, masukkan nama tugas yang unik.
- Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region
default-nya adalah
us-central1
.Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.
- Dari menu drop-down Template Dataflow, pilih the Bigtable change streams to Pub/Sub template.
- Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
- Klik Run job.
gcloud
Di shell atau terminal Anda, jalankan template:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Ganti kode berikut:
PROJECT_ID
: ID Google Cloud project tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama tugas unik pilihan AndaVERSION
: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latest
untuk menggunakan versi template terbaru, yang tersedia di folder induk tanpa tanggal di bucket— gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi template tertentu, yang dapat ditemukan bertingkat di folder induk yang diberi tanggal di bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1
BIGTABLE_INSTANCE_ID
: ID instance Bigtable Anda.BIGTABLE_TABLE_ID
: ID tabel Bigtable Anda.BIGTABLE_APPLICATION_PROFILE_ID
: ID profil aplikasi Bigtable Anda.PUBSUB_TOPIC
: nama topik tujuan Pub/Sub
API
Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk mengetahui informasi selengkapnya tentang
API dan cakupan otorisasinya, lihat
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Ganti kode berikut:
PROJECT_ID
: ID Google Cloud project tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama tugas unik pilihan AndaVERSION
: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latest
untuk menggunakan versi template terbaru, yang tersedia di folder induk tanpa tanggal di bucket— gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi template tertentu, yang dapat ditemukan bertingkat di folder induk yang diberi tanggal di bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1
BIGTABLE_INSTANCE_ID
: ID instance Bigtable Anda.BIGTABLE_TABLE_ID
: ID tabel Bigtable Anda.BIGTABLE_APPLICATION_PROFILE_ID
: ID profil aplikasi Bigtable Anda.PUBSUB_TOPIC
: nama topik tujuan Pub/Sub
Langkah berikutnya
- Pelajari template Dataflow.
- Lihat daftar template yang disediakan Google.