Template Datastream ke Spanner

Template Datastream ke Spanner adalah pipeline streaming yang membaca peristiwa Datastream dari bucket Cloud Storage dan menuliskannya ke database Spanner. Tujuannya adalah untuk migrasi data dari sumber Datastream ke Spanner. Tentukan parameter gcsPubSubSubscription untuk membaca data dari notifikasi Pub/Sub, ATAU berikan parameter inputFilePattern untuk membaca data langsung dari file di Cloud Storage.

Semua tabel yang diperlukan untuk migrasi harus ada di database Spanner tujuan sebelum eksekusi template. Oleh karena itu, migrasi skema dari database sumber ke Spanner tujuan harus diselesaikan sebelum migrasi data. Data dapat ada dalam tabel sebelum migrasi. Template ini tidak menyebarkan perubahan skema Datastream ke database Spanner.

Konsistensi data hanya dijamin di akhir migrasi saat semua data telah ditulis ke Spanner. Untuk menyimpan informasi pengurutan setiap data yang ditulis ke Spanner, template ini membuat tabel tambahan (disebut tabel bayangan) untuk setiap tabel di database Spanner. Tindakan ini digunakan untuk memastikan konsistensi di akhir migrasi. Tabel bayangan tidak dihapus setelah migrasi dan dapat digunakan untuk tujuan validasi di akhir migrasi.

Setiap error yang terjadi selama operasi, seperti ketidakcocokan skema, file JSON yang salah format, atau error yang dihasilkan dari menjalankan transformasi, dicatat dalam antrean error. Antrean error adalah folder Cloud Storage yang menyimpan semua peristiwa Datastream yang mengalami error beserta alasan error dalam format teks. Error dapat bersifat sementara atau permanen dan disimpan di folder Cloud Storage yang sesuai dalam antrean error. Error sementara dicoba lagi secara otomatis, sedangkan error permanen tidak. Jika terjadi error permanen, Anda memiliki opsi untuk mengoreksi peristiwa perubahan dan memindahkannya ke bucket yang dapat dicoba lagi saat template berjalan.

Persyaratan pipeline

  • Aliran Datastream dalam status Berjalan atau Belum dimulai.
  • Bucket Cloud Storage tempat peristiwa Datastream direplikasi.
  • Database Spanner dengan tabel yang ada. Tabel ini dapat kosong atau berisi data.

Parameter template

Parameter yang diperlukan

  • instanceId: Instance Spanner tempat perubahan direplikasi.
  • databaseId: Database Spanner tempat perubahan direplikasi.

Parameter opsional

  • inputFilePattern: Lokasi file Cloud Storage yang berisi file Datastream untuk direplikasi. Biasanya, ini adalah jalur root untuk aliran. Dukungan untuk fitur ini telah dinonaktifkan. Gunakan fitur ini hanya untuk mencoba lagi entri yang masuk ke DLQ berat.
  • inputFileFormat: Format file output yang dihasilkan oleh Datastream. Contohnya, avro,json. Setelan defaultnya adalah avro.
  • sessionFilePath: Jalur file sesi di Cloud Storage yang berisi informasi pemetaan dari HarbourBridge.
  • projectId: Project ID Spanner.
  • spannerHost: Endpoint Cloud Spanner yang akan dipanggil dalam template. Contoh, https://batch-spanner.googleapis.com. Secara default: https://batch-spanner.googleapis.com.
  • gcsPubSubSubscription: Langganan Pub/Sub yang digunakan dalam kebijakan notifikasi Cloud Storage. Untuk nama, gunakan format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • streamName: Nama atau template untuk stream yang akan di-polling untuk mendapatkan informasi skema dan jenis sumber.
  • shadowTablePrefix: Awalan yang digunakan untuk memberi nama tabel bayangan. Default: shadow_.
  • shouldCreateShadowTables: Flag ini menunjukkan apakah tabel bayangan harus dibuat di database Cloud Spanner. Nilai defaultnya adalah: true.
  • rfcStartDateTime: DateTime awal yang digunakan untuk mengambil dari Cloud Storage (https://tools.ietf.org/html/rfc3339). Default: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: Jumlah file DataStream serentak yang akan dibaca. Defaultnya adalah: 30.
  • deadLetterQueueDirectory: Jalur file yang digunakan saat menyimpan output antrean error. Jalur file default adalah direktori di bawah lokasi sementara tugas Dataflow.
  • dlqRetryMinutes: Jumlah menit antara percobaan ulang antrean pesan yang tidak terkirim. Nilai defaultnya adalah 10.
  • dlqMaxRetryCount: Jumlah maksimum percobaan ulang error sementara melalui DLQ. Nilai defaultnya adalah 500.
  • dataStreamRootUrl: URL Root Datastream API. Default ke: https://datastream.googleapis.com/.
  • datastreamSourceType: Ini adalah jenis database sumber yang terhubung ke Datastream. Contoh - mysql/oracle. Harus ditetapkan saat menguji tanpa Datastream yang benar-benar berjalan.
  • roundJsonDecimals: Jika disetel, tanda ini akan membulatkan nilai desimal di kolom JSON ke angka yang dapat disimpan tanpa kehilangan presisi. Nilai defaultnya adalah: false.
  • runMode: Ini adalah jenis mode run, baik reguler maupun dengan retryDLQ. Defaultnya adalah: reguler.
  • transformationContextFilePath: Jalur file konteks transformasi di penyimpanan cloud yang digunakan untuk mengisi data yang digunakan dalam transformasi yang dilakukan selama migrasi Misalnya: ID shard ke nama DB untuk mengidentifikasi DB tempat baris dimigrasikan.
  • directoryWatchDurationInMinutes: Durasi pipeline harus terus melakukan polling direktori di GCS. File output aliran data disusun dalam struktur direktori yang menggambarkan stempel waktu peristiwa yang dikelompokkan per menit. Parameter ini harus kira-kira sama dengan penundaan maksimum yang dapat terjadi antara peristiwa yang terjadi di database sumber dan peristiwa yang sama yang ditulis ke GCS oleh Datastream. Persentil ke-99,9 = 10 menit. Defaultnya adalah: 10.
  • spannerPriority: Prioritas permintaan untuk panggilan Cloud Spanner. Nilainya harus salah satu dari: [HIGH,MEDIUM,LOW]. Defaultnya adalah HIGH.
  • dlqGcsPubSubSubscription: Langganan Pub/Sub yang digunakan dalam kebijakan notifikasi Cloud Storage untuk direktori percobaan ulang DLQ saat berjalan dalam mode reguler. Untuk nama, gunakan format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. Jika ditetapkan, deadLetterQueueDirectory dan dlqRetryMinutes akan diabaikan.
  • transformationJarPath: Lokasi file JAR kustom di Cloud Storage untuk file yang berisi logika transformasi kustom untuk memproses rekaman dalam migrasi ke depan. Nilai defaultnya adalah kosong.
  • transformationClassName: Nama class yang sepenuhnya memenuhi syarat yang memiliki logika transformasi kustom. Kolom ini wajib diisi jika transformationJarPath ditentukan. Nilai defaultnya adalah kosong.
  • transformationCustomParameters: String yang berisi parameter kustom yang akan diteruskan ke class transformasi kustom. Nilai defaultnya adalah kosong.
  • filteredEventsDirectory: Ini adalah jalur file untuk menyimpan peristiwa yang difilter melalui transformasi kustom. Defaultnya adalah direktori di bawah lokasi sementara tugas Dataflow. Nilai default sudah cukup dalam sebagian besar kondisi.
  • shardingContextFilePath: Jalur file konteks sharding di Cloud Storage digunakan untuk mengisi ID shard di database Spanner untuk setiap shard sumber.Formatnya adalah Map<stream_name, Map<db_name, shard_id>>.
  • tableOverrides: Ini adalah penggantian nama tabel dari sumber ke Spanner. Data tersebut ditulis dalam format berikut: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]Contoh ini menunjukkan pemetaan tabel Singers ke Vocalists dan tabel Albums ke Records. Contoh, [{Singers, Vocalists}, {Albums, Records}]. Nilai defaultnya adalah kosong.
  • columnOverrides: Ini adalah penggantian nama kolom dari sumber ke spanner. Ditulis dalam format berikut: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Perhatikan bahwa SourceTableName harus tetap sama dalam pasangan sumber dan Spanner. Untuk mengganti nama tabel, gunakan tableOverrides.Contoh ini menunjukkan pemetaan SingerName ke TalentName dan AlbumName ke RecordName di tabel Singers dan Albums. Contoh, [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]. Nilai defaultnya adalah kosong.
  • schemaOverridesFilePath: File yang menentukan penggantian nama tabel dan kolom dari sumber ke Spanner. Nilai defaultnya adalah kosong.
  • shadowTableSpannerDatabaseId: Database terpisah opsional untuk tabel bayangan. Jika tidak ditentukan, tabel bayangan akan dibuat di database utama. Jika ditentukan, pastikan shadowTableSpannerInstanceId juga ditentukan. Nilai defaultnya adalah kosong.
  • shadowTableSpannerInstanceId: Instance terpisah opsional untuk tabel bayangan. Jika tidak ditentukan, tabel bayangan akan dibuat di instance utama. Jika ditentukan, pastikan shadowTableSpannerDatabaseId juga ditentukan. Nilai defaultnya adalah kosong.
  • failureInjectionParameter: Parameter injeksi kegagalan. Hanya digunakan untuk pengujian. Nilai defaultnya adalah kosong.

Menjalankan template

Konsol

  1. Buka halaman Dataflow Create job from template.
  2. Buka Membuat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Template Dataflow, pilih the Cloud Datastream to Spanner template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Ganti kode berikut:

  • PROJECT_ID: ID Google Cloud project tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • GCS_FILE_PATH: jalur Cloud Storage yang digunakan untuk menyimpan peristiwa aliran data. Contoh: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: instance Spanner Anda.
  • CLOUDSPANNER_DATABASE: database Spanner Anda.
  • DLQ: jalur Cloud Storage untuk direktori antrean error.

API

Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID Google Cloud project tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • GCS_FILE_PATH: jalur Cloud Storage yang digunakan untuk menyimpan peristiwa aliran data. Contoh: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: instance Spanner Anda.
  • CLOUDSPANNER_DATABASE: database Spanner Anda.
  • DLQ: jalur Cloud Storage untuk direktori antrean error.

Langkah berikutnya