Membangun koneksi aliran perubahan menggunakan Dataflow

Halaman ini menunjukkan cara membuat pipeline Dataflow yang menggunakan dan meneruskan data perubahan Spanner menggunakan aliran perubahan. Anda dapat menggunakan kode contoh di halaman ini untuk membuat pipeline kustom.

Konsep inti

Berikut adalah beberapa konsep inti pipeline Dataflow untuk aliran perubahan.

Dataflow

Dataflow adalah layanan serverless, cepat, dan hemat biaya yang mendukung stream dan batch processing. Solusi ini memberikan portabilitas dengan tugas pemrosesan yang ditulis menggunakan library Apache Beam open source serta mengotomatiskan penyediaan infrastruktur dan pengelolaan cluster. Dataflow menyediakan streaming yang mendekati real-time, dengan sekitar enam detik latensi saat membaca dari aliran data perubahan.

Anda dapat menggunakan Dataflow untuk menggunakan aliran perubahan Spanner dengan konektor SpannerIO, yang menawarkan abstraksi melalui Spanner API untuk membuat kueri aliran perubahan. Dengan konektor ini, Anda tidak perlu mengelola siklus proses partisi aliran perubahan, yang diperlukan saat Anda menggunakan Spanner API secara langsung. Konektor ini memberi Anda aliran data perubahan data sehingga Anda dapat lebih berfokus pada logika aplikasi, bukan pada detail API tertentu dan partisi aliran perubahan dinamis. Sebaiknya gunakan konektor SpannerIO, bukan Spanner API dalam sebagian besar situasi saat Anda perlu membaca data aliran data perubahan.

Template Dataflow adalah pipeline Dataflow bawaan yang mengimplementasikan kasus penggunaan umum. Lihat Template Dataflow untuk mendapatkan ringkasan.

Pipeline Dataflow

Pipeline Dataflow stream perubahan Spanner terdiri dari empat bagian utama:

  1. Database Spanner dengan aliran data perubahan
  2. Konektor SpannerIO
  3. Transformasi dan sink yang ditentukan pengguna
  4. Penulis I/O sink

gambar

Masing-masing dibahas secara lebih mendetail di bawah.

Aliran data perubahan Spanner

Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan.

Konektor Apache Beam SpannerIO

Ini adalah konektor SpannerIO yang dijelaskan sebelumnya. Ini adalah konektor I/O sumber yang memancarkan PCollection kumpulan data perubahan data ke tahap pipeline berikutnya. Waktu peristiwa untuk setiap kumpulan data perubahan data yang dimunculkan akan menjadi stempel waktu commit. Perlu diperhatikan bahwa data yang dimunculkan tidak berurutan, dan konektor SpannerIO menjamin bahwa tidak akan ada data terlambat.

Saat menangani aliran perubahan, Dataflow menggunakan checkpointing. Akibatnya, setiap pekerja mungkin menunggu hingga lima detik sambil melakukan buffering perubahan sebelum mengirim perubahan untuk diproses lebih lanjut. Latensi sekitar enam detik diperkirakan.

Transformasi yang ditentukan pengguna

Transformasi yang ditentukan pengguna memungkinkan pengguna menggabungkan, mengubah, atau memodifikasi data pemrosesan dalam pipeline Dataflow. Kasus penggunaan umum untuk hal ini adalah penghapusan informasi identitas pribadi, memenuhi persyaratan format data downstream, dan pengurutan. Lihat dokumentasi resmi Apache Beam untuk panduan pemrograman tentang transforms.

Penulis I/O sink Apache Beam

Apache Beam berisi transformasi I/O bawaan yang dapat digunakan untuk menulis dari pipeline Dataflow ke sink data seperti BigQuery. Sebagian besar sink data umum didukung secara native.

Template Dataflow

Template Dataflow memberi cara mudah untuk membuat tugas Dataflow berdasarkan image Docker yang dibangun sebelumnya untuk kasus penggunaan umum melalui konsol Google Cloud, Google Cloud CLI, atau panggilan Rest API.

Untuk aliran perubahan Spanner, kami menyediakan tiga template fleksibel Dataflow:

Membangun pipeline Dataflow

Bagian ini membahas konfigurasi awal konektor, dan menyediakan contoh untuk integrasi umum dengan fungsi aliran data perubahan Spanner.

Untuk mengikuti langkah-langkah ini, Anda memerlukan lingkungan pengembangan Java untuk Dataflow. Untuk mengetahui informasi selengkapnya, lihat Membuat pipeline Dataflow menggunakan Java.

Membuat aliran perubahan

Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan. Untuk melanjutkan ke langkah berikutnya, Anda harus memiliki database Spanner dengan aliran perubahan yang dikonfigurasi.

Memberikan hak istimewa kontrol akses terperinci

Jika Anda mengharapkan pengguna kontrol akses terperinci dapat menjalankan tugas Dataflow, pastikan pengguna diberi akses ke peran database yang memiliki hak istimewa SELECT di aliran perubahan dan hak istimewa EXECUTE pada fungsi bernilai tabel di aliran perubahan. Selain itu, pastikan akun utama menentukan peran database dalam konfigurasi SpannerIO atau di template fleksibel Dataflow.

Untuk informasi selengkapnya, lihat Tentang kontrol akses terperinci.

Menambahkan konektor SpannerIO sebagai dependensi

Konektor Apache Beam SpannerIO merangkum kompleksitas penggunaan aliran data perubahan secara langsung melalui Cloud Spanner API, sehingga menghasilkan PCollection data aliran data perubahan ke tahap pipeline berikutnya.

Objek ini dapat digunakan dalam tahap lain pipeline Dataflow pengguna. Integrasi aliran perubahan adalah bagian dari konektor SpannerIO. Agar dapat menggunakan konektor SpannerIO, dependensi harus ditambahkan ke file pom.xml Anda:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Membuat database metadata

Konektor harus melacak setiap partisi saat menjalankan pipeline Apache Beam. Solusi ini menyimpan metadata ini dalam tabel Spanner yang dibuat oleh konektor selama inisialisasi. Anda menentukan database tempat tabel ini akan dibuat saat mengonfigurasi konektor.

Seperti yang dijelaskan dalam Praktik terbaik aliran data perubahan, sebaiknya buat database baru untuk tujuan ini, daripada mengizinkan konektor menggunakan database aplikasi Anda untuk menyimpan tabel metadatanya.

Pemilik tugas Dataflow yang menggunakan konektor SpannerIO harus memiliki izin IAM berikut yang ditetapkan dengan database metadata ini:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Mengonfigurasi konektor

Konektor aliran data perubahan Spanner dapat dikonfigurasi sebagai berikut:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Berikut adalah deskripsi opsi readChangeStream():

Konfigurasi Spanner (Diperlukan)

Digunakan untuk mengonfigurasi project, instance, dan database tempat aliran perubahan dibuat dan harus dibuat kuerinya. Selain itu, secara opsional menentukan peran database yang akan digunakan jika akun utama IAM yang menjalankan tugas Dataflow adalah pengguna kontrol akses yang sangat terperinci. Tugas ini mengasumsikan peran database ini untuk akses ke aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Tentang kontrol akses terperinci.

Mengubah nama aliran data (Wajib diisi)

Nama ini secara unik mengidentifikasi aliran perubahan. Nama yang ada di sini harus sama dengan yang digunakan saat membuatnya.

ID instance metadata (Opsional)

Ini adalah instance untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol pemakaian data API aliran perubahan.

ID database metadata (Wajib)

Ini adalah database untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol pemakaian data API aliran perubahan.

Nama tabel metadata (Opsional)

Hanya boleh digunakan saat memperbarui pipeline yang ada.

Ini adalah nama tabel metadata yang sudah ada sebelumnya untuk digunakan oleh konektor. Ini digunakan oleh konektor untuk menyimpan metadata guna mengontrol pemakaian data API aliran perubahan. Jika opsi ini dihilangkan, Spanner akan membuat tabel baru dengan nama yang dihasilkan saat inisialisasi konektor.

Prioritas RPC (Opsional)

Prioritas permintaan yang akan digunakan untuk kueri aliran data perubahan. Jika parameter ini dihilangkan, high priority akan digunakan.

InclusiveStartAt (Wajib)

Perubahan dari stempel waktu yang diberikan akan ditampilkan ke pemanggil.

InklusiveEndAt (Opsional)

Perubahan hingga stempel waktu yang ditentukan akan ditampilkan ke pemanggil. Jika parameter ini dihilangkan, perubahan akan diterapkan tanpa batas.

Menambahkan transformasi dan sink untuk memproses data perubahan

Setelah langkah sebelumnya selesai, konektor SpannerIO yang dikonfigurasi siap memunculkan PCollection objek DataChangeRecord. Lihat Contoh transformasi dan sink untuk mengetahui beberapa contoh konfigurasi pipeline yang memproses data yang di-streaming ini dalam berbagai cara.

Perlu diperhatikan bahwa data aliran data perubahan yang dikeluarkan oleh konektor SpannerIO tidak diurutkan. Hal ini karena PCollections tidak memberikan jaminan pemesanan apa pun. Jika memerlukan streaming yang diurutkan, Anda harus mengelompokkan dan mengurutkan kumpulan data sebagai transformasi dalam pipeline: lihat Contoh: Urutkan menurut kunci. Anda dapat memperluas sampel ini untuk mengurutkan kumpulan data berdasarkan kolom catatan apa pun, seperti menurut ID transaksi.

Contoh transformasi dan sink

Anda dapat menentukan transformasi sendiri dan menentukan sink sebagai tempat penulisan data. Dokumentasi Apache Beam menyediakan berbagai transforms yang dapat diterapkan, serta siap menggunakan konektor I/O untuk menulis data ke dalam sistem eksternal.

Contoh: Urutkan menurut kunci

Contoh kode ini memunculkan kumpulan data perubahan yang diurutkan berdasarkan stempel waktu commit dan dikelompokkan menurut kunci utama menggunakan konektor Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Contoh kode ini menggunakan status dan timer untuk menyangga kumpulan data untuk setiap kunci, dan menetapkan waktu habis masa berlaku timer ke beberapa waktu yang dikonfigurasi pengguna T di masa mendatang (ditentukan dalam fungsi BufferKeyUntilOutputTimestamp). Saat watermark Dataflow melewati waktu T, kode ini akan menghapus semua kumpulan data dalam buffer dengan stempel waktu kurang dari T, mengurutkan kumpulan data ini berdasarkan stempel waktu commit, dan menghasilkan pasangan nilai kunci dengan:

  • Kuncinya adalah kunci input, yaitu kunci utama yang di-hash ke array bucket ukuran 1.000.
  • Nilainya adalah record perubahan data yang diurutkan yang di-buffer untuk kunci.

Untuk setiap kunci, kami memiliki jaminan berikut:

  • Timer dijamin akan diaktifkan sesuai urutan stempel waktu habis masa berlaku.
  • Panggung downstream dijamin menerima elemen dalam urutan yang sama seperti saat diproduksi.

Misalnya, untuk kunci dengan nilai 100, timer akan diaktifkan masing-masing pada T1 dan T10, yang menghasilkan paket kumpulan data perubahan data pada setiap stempel waktu. Karena kumpulan data perubahan data yang dihasilkan pada T1 dihasilkan sebelum kumpulan data perubahan yang di-output pada T10, kumpulan data perubahan data yang dihasilkan pada T1 juga dijamin akan diterima oleh tahap berikutnya sebelum kumpulan data perubahan data yang dihasilkan pada T10. Mekanisme ini membantu kami menjamin pengurutan stempel waktu commit yang ketat per kunci utama untuk pemrosesan downstream.

Proses ini akan berulang hingga pipeline berakhir dan semua catatan perubahan data telah diproses (atau akan berulang tanpa batas jika waktu berakhir tidak ditentukan).

Perhatikan bahwa contoh kode ini menggunakan status dan timer, bukan windows, untuk melakukan pengurutan per kunci. Alasannya adalah jendela tersebut tidak dijamin akan diproses secara berurutan. Ini berarti bahwa periode yang lebih lama dapat diproses lebih lambat dari periode yang lebih baru, yang dapat menyebabkan pemrosesan urutan yang tidak sesuai.

BreakRecordByModFn

Setiap kumpulan data perubahan data dapat berisi beberapa mod. Setiap mod mewakili penyisipan, pembaruan, atau penghapusan untuk satu nilai kunci utama. Fungsi ini memecah setiap kumpulan data perubahan data menjadi catatan perubahan data terpisah, satu per mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Fungsi ini menggunakan DataChangeRecord dan menghasilkan DataChangeRecord yang dikunci oleh kunci utama Spanner yang di-hash ke nilai bilangan bulat.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Timer dan buffer bersifat per kunci. Fungsi ini melakukan buffering setiap kumpulan data perubahan data hingga watermark meneruskan stempel waktu saat kita ingin menghasilkan catatan perubahan data yang di-buffer.

Kode ini menggunakan timer berulang untuk menentukan kapan harus menghapus buffer:

  1. Saat melihat kumpulan data perubahan data untuk sebuah kunci untuk pertama kalinya, sistem akan menyetel timer untuk diaktifkan pada stempel waktu commit kumpulan data perubahan data + incrementIntervalSeconds (opsi yang dapat dikonfigurasi pengguna).
  2. Saat timer diaktifkan, timer akan menambahkan semua kumpulan data perubahan data dalam buffer dengan stempel waktu yang lebih singkat dari waktu habis masa berlaku timer ke recordsToOutput. Jika buffer memiliki kumpulan data perubahan data yang stempel waktunya lebih besar dari atau sama dengan waktu habis masa berlaku timer, buffer akan menambahkan kumpulan data perubahan data tersebut kembali ke buffer, bukan menghasilkan output-nya. Kemudian, timer berikutnya akan disetel ke waktu habis masa berlaku timer saat ini ditambah incrementIntervalInSeconds.
  3. Jika recordsToOutput tidak kosong, fungsi akan mengurutkan kumpulan data perubahan data di recordsToOutput berdasarkan stempel waktu commit dan ID transaksi, lalu menghasilkan output tersebut.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Transaksi pemesanan

Pipeline ini dapat diubah ke urutan berdasarkan ID transaksi dan stempel waktu commit. Untuk melakukannya, buffer data untuk setiap pasangan ID transaksi / stempel waktu commit, bukan untuk setiap kunci Spanner. Tindakan ini memerlukan modifikasi kode di KeyByIdFn.

Contoh: Mengumpulkan transaksi

Contoh kode ini membaca catatan perubahan data, merakit semua catatan perubahan data milik transaksi yang sama menjadi satu elemen, dan menghasilkan elemen tersebut. Perhatikan bahwa transaksi yang dihasilkan oleh kode contoh ini tidak diurutkan berdasarkan stempel waktu commit.

Contoh kode ini menggunakan buffer untuk menyusun transaksi dari record perubahan data. Setelah menerima catatan perubahan data milik transaksi untuk pertama kalinya, fungsi ini akan membaca kolom numberOfRecordsInTransaction dalam kumpulan data perubahan data, yang menjelaskan jumlah catatan perubahan data yang diharapkan milik transaksi tersebut. Fungsi ini melakukan buffering kumpulan data perubahan data milik transaksi tersebut hingga jumlah kumpulan data yang di-buffer cocok dengan numberOfRecordsInTransaction, yang kemudian menghasilkan catatan perubahan data yang dipaketkan.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Fungsi ini menggunakan DataChangeRecord dan menghasilkan DataChangeRecord yang dikunci oleh ID transaksi.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

Buffer TransactionBoundaryFn menerima key-value pair {TransactionId, DataChangeRecord} dari KeyByTransactionIdFn dan mem-buffer-nya dalam grup berdasarkan TransactionId. Jika jumlah data yang di-buffer sama dengan jumlah kumpulan data yang terdapat dalam seluruh transaksi, fungsi ini akan mengurutkan objek DataChangeRecord dalam grup menurut urutan kumpulan data dan menghasilkan pasangan nilai kunci {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Di sini, kita mengasumsikan bahwa SortKey adalah class yang ditentukan pengguna yang mewakili pasangan {CommitTimestamp, TransactionId}. Lihat contoh implementasi untuk SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Contoh: Memfilter menurut tag transaksi

Ketika transaksi yang mengubah data pengguna diberi tag, tag yang sesuai dan jenisnya akan disimpan sebagai bagian dari DataChangeRecord. Contoh-contoh ini menunjukkan cara memfilter catatan aliran data perubahan berdasarkan tag transaksi yang ditentukan pengguna serta tag sistem:

Pemfilteran tag yang ditentukan pengguna untuk my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Pemfilteran tag sistem/pengauditan TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Contoh: Mengambil baris lengkap

Contoh ini berfungsi dengan tabel Spanner bernama Singer yang memiliki definisi berikut:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Pada mode pengambilan nilai OLD_AND_NEW_VALUES default untuk aliran data perubahan, saat ada pembaruan pada baris Spanner, catatan perubahan data yang diterima hanya akan berisi kolom yang diubah. Kolom yang dilacak tetapi tidak berubah tidak akan disertakan dalam data. Kunci utama mod dapat digunakan untuk melakukan pembacaan snapshot Spanner pada stempel waktu commit dari kumpulan data perubahan data untuk mengambil kolom yang tidak diubah atau bahkan mengambil baris lengkap.

Perlu diperhatikan bahwa kebijakan retensi database mungkin perlu diubah ke nilai yang lebih besar atau sama dengan kebijakan retensi aliran data perubahan agar pembacaan snapshot berhasil.

Perhatikan juga bahwa menggunakan jenis pengambilan nilai NEW_ROW adalah cara yang direkomendasikan dan lebih efisien untuk melakukannya, karena jenis ini menampilkan semua kolom baris yang dilacak secara default dan tidak memerlukan pembacaan snapshot tambahan ke Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Transformasi ini akan melakukan pembacaan yang sudah tidak berlaku pada stempel waktu commit dari setiap kumpulan data yang diterima, dan memetakan baris lengkapnya ke JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Kode ini membuat klien database Spanner untuk melakukan pengambilan baris lengkap, dan mengonfigurasi kumpulan sesi agar memiliki beberapa sesi saja, serta melakukan pembacaan dalam satu instance ToFullRowJsonFn secara berurutan. Dataflow memastikan untuk memunculkan banyak instance fungsi ini, masing-masing dengan kumpulan kliennya sendiri.

Contoh: Spanner ke Pub/Sub

Dalam skenario ini, pemanggil melakukan streaming rekaman ke Pub/Sub secepat mungkin, tanpa pengelompokan atau agregasi. Ini cocok untuk memicu pemrosesan downstream, karena melakukan streaming semua baris baru yang disisipkan ke dalam tabel Spanner ke Pub/Sub untuk diproses lebih lanjut.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Perhatikan bahwa sink Pub/Sub dapat dikonfigurasi untuk memastikan semantik tepat satu kali.

Contoh: Spanner ke Cloud Storage

Dalam skenario ini, pemanggil mengelompokkan semua data dalam jendela tertentu dan menyimpan grup dalam file Cloud Storage terpisah. Ini cocok untuk analisis dan pengarsipan point-in-time, yang terpisah dari periode retensi Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Perhatikan bahwa sink Cloud Storage menyediakan semantik setidaknya satu kali secara default. Dengan pemrosesan tambahan, dapat dimodifikasi agar memiliki semantik tepat satu kali.

Kami juga menyediakan template Dataflow untuk kasus penggunaan ini: lihat Menghubungkan aliran data perubahan ke Cloud Storage.

Contoh: Spanner ke BigQuery (tabel besar)

Di sini, pemanggil mengalirkan data perubahan ke BigQuery. Setiap kumpulan data perubahan data ditampilkan sebagai satu baris di BigQuery. Tampilan ini cocok untuk analisis. Kode ini menggunakan fungsi yang ditentukan sebelumnya, di bagian Ambil baris lengkap, untuk mengambil baris lengkap data dan menulisnya ke BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Perhatikan bahwa sink BigQuery menyediakan semantik setidaknya-satu kali secara default. Dengan pemrosesan tambahan, dapat dimodifikasi agar memiliki semantik tepat satu kali.

Kami juga menyediakan template Dataflow untuk kasus penggunaan ini; lihat Menghubungkan aliran data perubahan ke BigQuery.

Memantau pipeline

Ada dua kelas metrik yang tersedia untuk memantau pipeline Dataflow aliran perubahan.

Metrik Dataflow standar

Dataflow menyediakan beberapa metrik untuk memastikan tugas Anda berjalan dengan baik, seperti keaktualan data, jeda sistem, throughput tugas, penggunaan CPU pekerja, dan lainnya. Anda dapat menemukan informasi selengkapnya di artikel Menggunakan Pemantauan untuk pipeline Dataflow.

Untuk pipeline aliran perubahan, ada dua metrik utama yang harus dipertimbangkan: latensi sistem dan keaktualan data.

Latensi sistem akan memberi tahu Anda durasi waktu maksimum saat ini (dalam detik) saat item data diproses atau menunggu pemrosesan.

Keaktualan data akan menunjukkan jumlah waktu antara saat ini (real time) dan watermark output. Watermark output waktu T menunjukkan bahwa semua elemen dengan waktu peristiwa (secara ketat) sebelum T telah diproses untuk komputasi. Dengan kata lain, keaktualan data mengukur seberapa baru pipeline dalam memproses peristiwa yang telah diterima.

Jika sumber daya pipeline kurang, Anda dapat melihat efek tersebut dalam kedua metrik ini. Latensi sistem akan meningkat karena item harus menunggu lebih lama sebelum diproses. Keaktualan data juga akan meningkat karena pipeline tidak akan dapat mengimbangi jumlah data yang diterima.

Metrik aliran data perubahan kustom

Metrik ini diekspos dalam Cloud Monitoring dan mencakup:

  • Latensi bucket (histogram) antara kumpulan data yang di-commit di Spanner ke data tersebut dimunculkan ke dalam PCollection oleh konektor. Metrik ini dapat digunakan untuk melihat masalah performa (latensi) pada pipeline.
  • Jumlah total catatan data yang dibaca. Ini adalah indikasi keseluruhan jumlah catatan yang dikeluarkan oleh konektor. Jumlah ini seharusnya akan terus meningkat, mencerminkan tren penulisan di database Spanner yang mendasarinya.
  • Jumlah partisi yang sedang dibaca. Harus selalu ada partisi yang dibaca. Jika angka ini nol, menunjukkan bahwa telah terjadi error dalam pipeline.
  • Jumlah total kueri yang dikeluarkan selama eksekusi konektor. Ini merupakan indikasi keseluruhan dari kueri aliran data perubahan yang dibuat ke instance Spanner selama eksekusi pipeline. Ini dapat digunakan untuk mendapatkan estimasi beban dari konektor ke database Spanner.

Memperbarui pipeline yang ada

Anda dapat memperbarui pipeline yang sedang berjalan yang menggunakan konektor SpannerIO untuk memproses aliran perubahan jika pemeriksaan kompatibilitas tugas lulus. Untuk melakukannya, Anda harus secara eksplisit menetapkan parameter nama tabel metadata tugas baru saat memperbaruinya. Gunakan nilai opsi pipeline metadataTable dari tugas yang Anda update.

Jika Anda menggunakan template Dataflow yang disediakan Google, tetapkan nama tabel menggunakan parameter spannerMetadataTableName. Anda juga dapat mengubah tugas yang ada untuk menggunakan tabel metadata secara eksplisit dengan metode withMetadataTable(your-metadata-table-name) dalam konfigurasi konektor. Setelah selesai, Anda dapat mengikuti petunjuk di Meluncurkan tugas pengganti dari dokumen Dataflow untuk memperbarui tugas yang sedang berjalan.

Praktik terbaik untuk aliran data perubahan dan Dataflow

Berikut adalah beberapa praktik terbaik untuk membangun koneksi aliran perubahan dengan menggunakan Dataflow.

Menggunakan database metadata terpisah

Sebaiknya buat database terpisah untuk konektor SpannerIO yang akan digunakan untuk penyimpanan metadata, bukan mengonfigurasinya agar menggunakan database aplikasi Anda.

Untuk informasi selengkapnya, lihat Mempertimbangkan database metadata terpisah.

Menyesuaikan ukuran cluster

Aturan praktis untuk jumlah awal pekerja dalam tugas aliran data perubahan Spanner adalah satu pekerja per 1.000 penulisan per detik. Perhatikan bahwa estimasi ini dapat bervariasi, bergantung pada beberapa faktor, seperti ukuran setiap transaksi, jumlah catatan aliran perubahan yang dihasilkan dari satu transaksi dan transformasi, agregasi, atau sink lainnya yang digunakan dalam pipeline.

Setelah penyediaan sumber daya awal, penting untuk memantau metrik yang disebutkan dalam Memantau pipeline, untuk memastikan bahwa pipeline berjalan dengan baik. Sebaiknya lakukan eksperimen dengan ukuran kumpulan pekerja awal dan pantau cara pipeline Anda menangani beban, dengan meningkatkan jumlah node jika perlu. Pemakaian CPU adalah metrik utama untuk memeriksa apakah beban sudah tepat dan ada lebih banyak node yang diperlukan.

Batasan umum

Penskalaan otomatis

Dukungan penskalaan otomatis untuk semua pipeline yang menyertakan SpannerIO.readChangeStream memerlukan Apache Beam 2.39.0 atau yang lebih tinggi.

Jika Anda menggunakan versi Apache Beam sebelum 2.39.0, pipeline yang menyertakan SpannerIO.readChangeStream harus secara eksplisit menentukan algoritma penskalaan otomatis sebagai NONE, seperti yang dijelaskan dalam Penskalaan horizontal otomatis.

Untuk menskalakan pipeline Dataflow secara manual, bukan menggunakan penskalaan otomatis, lihat Penskalaan pipeline streaming secara manual.

Pelari V2

Konektor aliran data perubahan Spanner memerlukan Dataflow Runner V2. Ini harus ditentukan secara manual selama eksekusi. Jika tidak, error akan ditampilkan. Anda dapat menentukan Runner V2 dengan mengonfigurasi tugas menggunakan --experiments=use_unified_worker,use_runner_v2.

Mengambil snapshot

Konektor aliran data perubahan Spanner tidak mendukung Snapshot Dataflow.

Menyelesaikan

Konektor aliran data perubahan Spanner tidak mendukung pengosongan tugas. Anda hanya dapat membatalkan tugas yang sudah ada.

Anda juga dapat memperbarui pipeline yang ada tanpa perlu menghentikannya.

OpenCensus

Untuk menggunakan OpenCensus guna memantau pipeline, tentukan versi 0.28.3 atau yang lebih baru.

NullPointerException saat pipeline dimulai

Bug di Apache Beam versi 2.38.0 dapat menyebabkan NullPointerException saat memulai pipeline dalam kondisi tertentu. Ini akan mencegah tugas Anda dimulai, dan menampilkan pesan error ini:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Untuk mengatasi masalah ini, gunakan Apache Beam versi 2.39.0 atau yang lebih baru, atau tentukan versi beam-sdks-java-core secara manual sebagai 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Informasi selengkapnya