Mengimpor, mengekspor, dan mengubah data menggunakan Dataflow

Halaman ini menjelaskan cara menggunakan konektor Dataflow untuk Spanner guna mengimpor, mengekspor, dan mengubah data di database dengan dialek GoogleSQL dan database dengan dialek PostgreSQL di Spanner.

Dataflow adalah layanan terkelola untuk mengubah dan memperkaya data. Konektor Dataflow untuk Spanner memungkinkan Anda membaca data dari dan menulis data ke Spanner di pipeline Dataflow, dengan opsi untuk mengubah atau memodifikasi data. Anda juga dapat membuat pipeline yang mentransfer data antara Spanner dan Google Cloud produk lainnya.

Konektor Dataflow adalah metode yang direkomendasikan untuk memindahkan data secara efisien ke dan dari Spanner dalam jumlah besar. Metode ini juga merupakan metode yang direkomendasikan untuk melakukan transformasi besar pada database yang tidak didukung oleh DML Berpartisi, seperti pemindahan tabel dan penghapusan massal yang memerlukan JOIN. Saat bekerja dengan database individual, ada metode lain yang dapat Anda gunakan untuk mengimpor dan mengekspor data:

  • Gunakan konsol Google Cloud untuk mengekspor database individual dari Spanner ke Cloud Storage dalam format Avro.
  • Gunakan konsol Google Cloud untuk mengimpor database kembali ke Spanner dari file yang Anda ekspor ke Cloud Storage.
  • Gunakan REST API atau Google Cloud CLI untuk menjalankan tugas ekspor atau impor dari Spanner ke Cloud Storage dan kembali menggunakan format Avro.

Konektor Dataflow untuk Spanner adalah bagian dari Apache Beam Java SDK, dan menyediakan API untuk melakukan tindakan sebelumnya. Untuk mengetahui informasi selengkapnya tentang beberapa konsep yang dibahas di halaman ini, seperti objek PCollection dan transformasi, lihat panduan pemrograman Apache Beam.

Tambahkan konektor ke project Maven Anda

Untuk menambahkan konektor Dataflow Google Cloud ke project Maven, tambahkan artefak Maven beam-sdks-java-io-google-cloud-platform ke file pom.xml Anda sebagai dependensi.

Misalnya, dengan asumsi bahwa file pom.xml Anda menetapkan beam.version ke nomor versi yang sesuai, Anda akan menambahkan dependensi berikut:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Membaca data dari Spanner

Untuk membaca dari Spanner, terapkan transformasi SpannerIO.read. Konfigurasi pembacaan menggunakan metode di class SpannerIO.Read. Penerapan transformasi akan menampilkan PCollection<Struct>, dengan setiap elemen dalam koleksi mewakili setiap baris yang ditampilkan oleh operasi baca. Anda dapat membaca dari Spanner dengan dan tanpa kueri SQL tertentu, bergantung pada output yang Anda butuhkan.

Menerapkan transformasi SpannerIO.read akan menampilkan tampilan data yang konsisten dengan melakukan pembacaan yang kuat. Kecuali jika Anda menentukan lain, hasil pembacaan diambil snapshot-nya pada saat Anda memulai pembacaan. Lihat bacaan untuk mengetahui informasi selengkapnya tentang berbagai jenis bacaan yang dapat dilakukan Spanner.

Membaca data menggunakan kueri

Untuk membaca kumpulan data tertentu dari Spanner, konfigurasi transform menggunakan metode SpannerIO.Read.withQuery untuk menentukan kueri SQL. Contoh:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Membaca data tanpa menentukan kueri

Untuk membaca dari database tanpa menggunakan kueri, Anda dapat menentukan nama tabel menggunakan metode SpannerIO.Read.withTable, dan menentukan daftar kolom yang akan dibaca menggunakan metode SpannerIO.Read.withColumns. Contoh:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Untuk membatasi baris yang dibaca, Anda dapat menentukan serangkaian kunci utama yang akan dibaca menggunakan metode SpannerIO.Read.withKeySet.

Anda juga dapat membaca tabel menggunakan indeks sekunder yang ditentukan. Seperti halnya panggilan API readUsingIndex, indeks harus berisi semua data yang muncul dalam hasil kueri.

Untuk melakukannya, tentukan tabel seperti yang ditunjukkan dalam contoh sebelumnya, dan tentukan indeks yang berisi nilai kolom yang diperlukan menggunakan metode SpannerIO.Read.withIndex. Indeks harus menyimpan semua kolom yang perlu dibaca oleh transformasi. Kunci utama tabel dasar disimpan secara implisit. Misalnya, untuk membaca tabel Songs menggunakan indeks SongsBySongName, Anda menggunakan kode berikut:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Mengontrol keusangan data transaksi

Transformasi dijamin akan dieksekusi pada snapshot data yang konsisten. Untuk mengontrol ketidakberlakuan data, gunakan metode SpannerIO.Read.withTimestampBound. Lihat transaksi untuk mengetahui informasi selengkapnya.

Membaca dari beberapa tabel dalam transaksi yang sama

Jika Anda ingin membaca data dari beberapa tabel pada titik waktu yang sama untuk memastikan konsistensi data, lakukan semua pembacaan dalam satu transaksi. Untuk melakukannya, terapkan transformasi createTransaction, buat objek PCollectionView<Transaction> yang kemudian membuat transaksi. Tampilan yang dihasilkan dapat diteruskan ke operasi baca menggunakan SpannerIO.Read.withTransaction.

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Membaca data dari semua tabel yang tersedia

Anda dapat membaca data dari semua tabel yang tersedia di database Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Memecahkan masalah kueri yang tidak didukung

Konektor Dataflow hanya mendukung kueri SQL Spanner dengan operator pertama dalam rencana eksekusi kueri adalah Distributed Union. Jika Anda mencoba membaca data dari Spanner menggunakan kueri dan Anda mendapatkan pengecualian yang menyatakan bahwa kueri does not have a DistributedUnion at the root, ikuti langkah-langkah di Memahami cara Spanner menjalankan kueri untuk mengambil rencana eksekusi untuk kueri Anda menggunakan konsol Google Cloud .

Jika kueri SQL Anda tidak didukung, sederhanakan kueri tersebut menjadi kueri yang memiliki union terdistribusi sebagai operator pertama dalam rencana eksekusi kueri. Hapus fungsi agregat, gabungan tabel, serta operator DISTINCT, GROUP BY, dan ORDER, karena operator tersebut adalah operator yang paling mungkin mencegah kueri berfungsi.

Membuat mutasi untuk operasi tulis

Gunakan metode newInsertOrUpdateBuilder class Mutation, bukan metode newInsertBuilder, kecuali jika benar-benar diperlukan untuk pipeline Java. Untuk pipeline Python, gunakan SpannerInsertOrUpdate, bukan SpannerInsert. Dataflow memberikan jaminan minimal satu kali, yang berarti mutasi mungkin ditulis beberapa kali. Akibatnya, hanya mutasi INSERT yang dapat menghasilkan error com.google.cloud.spanner.SpannerException: ALREADY_EXISTS yang menyebabkan pipeline gagal. Untuk mencegah error ini, gunakan mutasi INSERT_OR_UPDATE sebagai gantinya, yang menambahkan baris baru atau memperbarui nilai kolom jika baris sudah ada. Mutasi INSERT_OR_UPDATE dapat diterapkan lebih dari sekali.

Menulis ke Spanner dan mengubah data

Anda dapat menulis data ke Spanner dengan konektor Dataflow menggunakan transformasi SpannerIO.write untuk mengeksekusi kumpulan mutasi baris input. Konektor Dataflow mengelompokkan mutasi ke dalam batch agar efisien.

Contoh berikut menunjukkan cara menerapkan transformasi tulis ke PCollection mutasi:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Jika transformasi tiba-tiba berhenti sebelum selesai, mutasi yang telah diterapkan tidak akan di-roll back.

Menerapkan grup mutasi secara atomik

Anda dapat menggunakan class MutationGroup untuk memastikan bahwa sekelompok mutasi diterapkan bersama secara atomik. Mutasi dalam MutationGroup dijamin akan dikirimkan dalam transaksi yang sama, tetapi transaksi mungkin dicoba lagi.

Grup mutasi berperforma terbaik jika digunakan untuk mengelompokkan mutasi yang memengaruhi data yang disimpan berdekatan dalam ruang kunci. Karena Spanner menyisipkan data tabel induk dan turunan bersama-sama dalam tabel induk, data tersebut selalu berdekatan dalam ruang kunci. Sebaiknya strukturkan grup mutasi Anda sehingga berisi satu mutasi yang diterapkan ke tabel induk dan mutasi tambahan yang diterapkan ke tabel turunan, atau sehingga semua mutasinya mengubah data yang berdekatan dalam ruang kunci. Untuk mengetahui informasi selengkapnya tentang cara Spanner menyimpan data tabel induk dan turunan, lihat Skema dan model data. Jika Anda tidak mengatur grup mutasi di sekitar hierarki tabel yang direkomendasikan, atau jika data yang diakses tidak berdekatan dalam ruang kunci, Spanner mungkin perlu melakukan commit dua fase, yang menghasilkan performa yang lebih lambat. Untuk mengetahui informasi selengkapnya, lihat Trade-off lokalitas.

Untuk menggunakan MutationGroup, buat transformasi SpannerIO.write dan panggil metode SpannerIO.Write.grouped, yang akan menampilkan transformasi yang kemudian dapat Anda terapkan ke PCollection objek MutationGroup.

Saat membuat MutationGroup, mutasi pertama yang tercantum akan menjadi mutasi utama. Jika grup mutasi Anda memengaruhi tabel induk dan turunan, mutasi utama harus berupa mutasi ke tabel induk. Jika tidak, Anda dapat menggunakan mutasi apa pun sebagai mutasi utama. Konektor Dataflow menggunakan mutasi utama untuk menentukan batas partisi agar dapat mengelompokkan mutasi secara efisien.

Misalnya, bayangkan aplikasi Anda memantau perilaku dan menandai perilaku pengguna yang bermasalah untuk ditinjau. Untuk setiap perilaku yang ditandai, Anda ingin memperbarui tabel Users untuk memblokir akses pengguna ke aplikasi Anda, dan Anda juga perlu mencatat insiden tersebut dalam tabel PendingReviews. Untuk memastikan bahwa kedua tabel diperbarui secara atomik, gunakan MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Saat membuat grup mutasi, mutasi pertama yang diberikan sebagai argumen akan menjadi mutasi utama. Dalam hal ini, kedua tabel tidak terkait, sehingga tidak ada mutasi utama yang jelas. Kami telah memilih userMutation sebagai yang utama dengan menempatkannya di urutan pertama. Menerapkan kedua mutasi secara terpisah akan lebih cepat, tetapi tidak akan menjamin atomisitas, sehingga grup mutasi adalah pilihan terbaik dalam situasi ini.

Langkah berikutnya