Mengubah data menggunakan operasi tulis batch

Halaman ini menjelaskan permintaan operasi tulis batch Spanner dan cara menggunakannya untuk mengubah data Spanner.

Anda dapat menggunakan penulisan batch Spanner untuk menyisipkan, memperbarui, atau menghapus beberapa baris di tabel Spanner. Tulis batch Spanner mendukung operasi tulis latensi rendah tanpa operasi baca, dan menampilkan respons saat mutasi diterapkan dalam batch. Untuk menggunakan penulisan batch, Anda mengelompokkan mutasi terkait, dan semua mutasi dalam grup di-commit secara atomik. Mutasi di seluruh grup diterapkan dalam urutan yang tidak ditentukan dan tidak saling bergantung (non-atomik). Spanner tidak perlu menunggu semua mutasi diterapkan sebelum mengirim respons, yang berarti bahwa operasi tulis batch memungkinkan kegagalan sebagian. Anda juga dapat menjalankan beberapa operasi tulis batch sekaligus. Untuk mengetahui informasi selengkapnya, lihat Cara menggunakan penulisan batch.

Kasus penggunaan

Penulisan batch Spanner sangat berguna jika Anda ingin melakukan penulisan dalam jumlah besar tanpa operasi baca, tetapi tidak memerlukan transaksi atomik untuk semua mutasi Anda.

Jika Anda ingin mengelompokkan permintaan DML, gunakan DML batch untuk mengubah data Spanner. Untuk mengetahui informasi selengkapnya tentang perbedaan antara DML dan mutasi, lihat Membandingkan DML dan mutasi.

Untuk permintaan mutasi tunggal, sebaiknya gunakan transaksi baca-tulis penguncian.

Batasan

Operasi tulis batch Spanner memiliki batasan berikut:

  • Penulisan batch Spanner tidak tersedia menggunakan Google Cloud konsol atau Google Cloud CLI. API ini hanya tersedia menggunakan API REST dan RPC serta library klien Spanner.

  • Perlindungan replay tidak didukung menggunakan penulisan batch. Mutasi dapat diterapkan lebih dari sekali, dan mutasi yang diterapkan lebih dari sekali dapat menyebabkan kegagalan. Misalnya, jika mutasi penyisipan diputar ulang, mutasi tersebut mungkin menghasilkan error sudah ada, atau jika Anda menggunakan kunci berbasis stempel waktu commit atau yang dihasilkan dalam mutasi, mutasi tersebut dapat menyebabkan baris tambahan ditambahkan ke tabel. Sebaiknya strukturkan operasi tulis Anda agar bersifat idempoten untuk menghindari masalah ini.

  • Anda tidak dapat melakukan rollback permintaan operasi tulis batch yang telah selesai. Anda dapat membatalkan permintaan operasi tulis batch yang sedang berlangsung. Jika Anda membatalkan operasi tulis batch yang sedang berlangsung, mutasi dalam grup yang belum selesai akan di-roll back. Mutasi dalam grup yang telah selesai di-commit ke database.

  • Ukuran maksimum untuk permintaan tulis batch sama dengan batas untuk permintaan commit. Untuk mengetahui informasi selengkapnya, lihat Batas untuk membuat, membaca, memperbarui, dan menghapus data.

Cara menggunakan operasi tulis batch

Untuk menggunakan operasi tulis batch, Anda harus memiliki izin spanner.databases.write di database yang ingin diubah. Anda dapat menulis mutasi secara massal secara non-atomik dalam satu panggilan menggunakan panggilan permintaan REST atau RPC API.

Anda harus mengelompokkan jenis mutasi berikut saat menggunakan operasi tulis batch:

  • Menyisipkan baris dengan awalan kunci utama yang sama di tabel induk dan turunan.
  • Menyisipkan baris ke dalam tabel dengan hubungan kunci asing di antara tabel.
  • Jenis mutasi terkait lainnya bergantung pada skema database dan logika aplikasi Anda.

Anda juga dapat melakukan penulisan secara batch menggunakan library klien Spanner. Contoh kode berikut memperbarui tabel Singers dengan baris baru.

Library klien

Java


import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.MutationGroup;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Code;
import com.google.spanner.v1.BatchWriteResponse;

public class BatchWriteAtLeastOnceSample {

  /***
   * Assume DDL for the underlying database:
   * <pre>{@code
   *   CREATE TABLE Singers (
   *     SingerId   INT64 NOT NULL,
   *     FirstName  STRING(1024),
   *     LastName   STRING(1024),
   *   ) PRIMARY KEY (SingerId)
   *
   *   CREATE TABLE Albums (
   *     SingerId     INT64 NOT NULL,
   *     AlbumId      INT64 NOT NULL,
   *     AlbumTitle   STRING(1024),
   *   ) PRIMARY KEY (SingerId, AlbumId),
   *   INTERLEAVE IN PARENT Singers ON DELETE CASCADE
   * }</pre>
   */

  private static final MutationGroup MUTATION_GROUP1 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(16)
              .set("FirstName")
              .to("Scarlet")
              .set("LastName")
              .to("Terry")
              .build());
  private static final MutationGroup MUTATION_GROUP2 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(17)
              .set("FirstName")
              .to("Marc")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(18)
              .set("FirstName")
              .to("Catalina")
              .set("LastName")
              .to("Smith")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(17)
              .set("AlbumId")
              .to(1)
              .set("AlbumTitle")
              .to("Total Junk")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(18)
              .set("AlbumId")
              .to(2)
              .set("AlbumTitle")
              .to("Go, Go, Go")
              .build());

  static void batchWriteAtLeastOnce() {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "my-project";
    final String instanceId = "my-instance";
    final String databaseId = "my-database";
    batchWriteAtLeastOnce(projectId, instanceId, databaseId);
  }

  static void batchWriteAtLeastOnce(String projectId, String instanceId, String databaseId) {
    try (Spanner spanner =
        SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
      DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
      final DatabaseClient dbClient = spanner.getDatabaseClient(dbId);

      // Creates and issues a BatchWrite RPC request that will apply the mutation groups
      // non-atomically and respond back with a stream of BatchWriteResponse.
      ServerStream<BatchWriteResponse> responses =
          dbClient.batchWriteAtLeastOnce(
              ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
              Options.tag("batch-write-tag"));

      // Iterates through the results in the stream response and prints the MutationGroup indexes,
      // commit timestamp and status.
      for (BatchWriteResponse response : responses) {
        if (response.getStatus().getCode() == Code.OK_VALUE) {
          System.out.printf(
              "Mutation group indexes %s have been applied with commit timestamp %s",
              response.getIndexesList(), response.getCommitTimestamp());
        } else {
          System.out.printf(
              "Mutation group indexes %s could not be applied with error code %s and "
                  + "error message %s", response.getIndexesList(),
              Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
        }
      }
    }
  }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
	"google.golang.org/grpc/status"
)

// batchWrite demonstrates writing mutations to a Spanner database through
// BatchWrite API - https://pkg.go.dev/cloud.google.com/go/spanner#Client.BatchWrite
func batchWrite(w io.Writer, db string) error {
	// db := "projects/my-project/instances/my-instance/databases/my-database"
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	// Database is assumed to exist - https://cloud.google.com/spanner/docs/getting-started/go#create_a_database
	singerColumns := []string{"SingerId", "FirstName", "LastName"}
	albumColumns := []string{"SingerId", "AlbumId", "AlbumTitle"}
	mutationGroups := make([]*spanner.MutationGroup, 2)

	mutationGroup1 := []*spanner.Mutation{
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{16, "Scarlet", "Terry"}),
	}
	mutationGroups[0] = &spanner.MutationGroup{Mutations: mutationGroup1}

	mutationGroup2 := []*spanner.Mutation{
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{17, "Marc", ""}),
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{18, "Catalina", "Smith"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{17, 1, "Total Junk"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{18, 2, "Go, Go, Go"}),
	}
	mutationGroups[1] = &spanner.MutationGroup{Mutations: mutationGroup2}
	iter := client.BatchWrite(ctx, mutationGroups)
	// See https://pkg.go.dev/cloud.google.com/go/spanner#BatchWriteResponseIterator.Do
	doFunc := func(response *sppb.BatchWriteResponse) error {
		if err = status.ErrorProto(response.GetStatus()); err == nil {
			fmt.Fprintf(w, "Mutation group indexes %v have been applied with commit timestamp %v",
				response.GetIndexes(), response.GetCommitTimestamp())
		} else {
			fmt.Fprintf(w, "Mutation group indexes %v could not be applied with error %v",
				response.GetIndexes(), err)
		}
		// Return an actual error as needed.
		return nil
	}
	return iter.Do(doFunc)
}

Node


// Imports the Google Cloud client library
const {Spanner, MutationGroup} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Create Mutation Groups
/**
 * Related mutations should be placed in a group, such as insert mutations for both a parent and a child row.
 * A group must contain related mutations.
 * Please see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.BatchWriteRequest.MutationGroup}
 * for more details and examples.
 */
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('Singers', {
  SingerId: 1,
  FirstName: 'Scarlet',
  LastName: 'Terry',
});

const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('Singers', {
  SingerId: 2,
  FirstName: 'Marc',
});
mutationGroup2.insert('Singers', {
  SingerId: 3,
  FirstName: 'Catalina',
  LastName: 'Smith',
});
mutationGroup2.insert('Albums', {
  AlbumId: 1,
  SingerId: 2,
  AlbumTitle: 'Total Junk',
});
mutationGroup2.insert('Albums', {
  AlbumId: 2,
  SingerId: 3,
  AlbumTitle: 'Go, Go, Go',
});

const options = {
  transactionTag: 'batch-write-tag',
};

try {
  database
    .batchWriteAtLeastOnce([mutationGroup1, mutationGroup2], options)
    .on('error', console.error)
    .on('data', response => {
      // Check the response code of each response to determine whether the mutation group(s) were applied successfully.
      if (response.status.code === 0) {
        console.log(
          `Mutation group indexes ${
            response.indexes
          }, have been applied with commit timestamp ${Spanner.timestamp(
            response.commitTimestamp,
          ).toJSON()}`,
        );
      }
      // Mutation groups that fail to commit trigger a response with a non-zero status code.
      else {
        console.log(
          `Mutation group indexes ${response.indexes}, could not be applied with error code ${response.status.code}, and error message ${response.status.message}`,
        );
      }
    })
    .on('end', () => {
      console.log('Request completed successfully');
    });
} catch (err) {
  console.log(err);
}

Python

def batch_write(instance_id, database_id):
    """Inserts sample data into the given database via BatchWrite API.

    The database and table must already exist and can be created using
    `create_database`.
    """
    from google.rpc.code_pb2 import OK

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.mutation_groups() as groups:
        group1 = groups.group()
        group1.insert_or_update(
            table="Singers",
            columns=("SingerId", "FirstName", "LastName"),
            values=[
                (16, "Scarlet", "Terry"),
            ],
        )

        group2 = groups.group()
        group2.insert_or_update(
            table="Singers",
            columns=("SingerId", "FirstName", "LastName"),
            values=[
                (17, "Marc", ""),
                (18, "Catalina", "Smith"),
            ],
        )
        group2.insert_or_update(
            table="Albums",
            columns=("SingerId", "AlbumId", "AlbumTitle"),
            values=[
                (17, 1, "Total Junk"),
                (18, 2, "Go, Go, Go"),
            ],
        )

        for response in groups.batch_write():
            if response.status.code == OK:
                print(
                    "Mutation group indexes {} have been applied with commit timestamp {}".format(
                        response.indexes, response.commit_timestamp
                    )
                )
            else:
                print(
                    "Mutation group indexes {} could not be applied with error {}".format(
                        response.indexes, response.status
                    )
                )

C++

namespace spanner = ::google::cloud::spanner;
// Use upserts as mutation groups are not replay protected.
auto commit_results = client.CommitAtLeastOnce({
    // group #0
    spanner::Mutations{
        spanner::InsertOrUpdateMutationBuilder(
            "Singers", {"SingerId", "FirstName", "LastName"})
            .EmplaceRow(16, "Scarlet", "Terry")
            .Build(),
    },
    // group #1
    spanner::Mutations{
        spanner::InsertOrUpdateMutationBuilder(
            "Singers", {"SingerId", "FirstName", "LastName"})
            .EmplaceRow(17, "Marc", "")
            .EmplaceRow(18, "Catalina", "Smith")
            .Build(),
        spanner::InsertOrUpdateMutationBuilder(
            "Albums", {"SingerId", "AlbumId", "AlbumTitle"})
            .EmplaceRow(17, 1, "Total Junk")
            .EmplaceRow(18, 2, "Go, Go, Go")
            .Build(),
    },
});
for (auto& commit_result : commit_results) {
  if (!commit_result) throw std::move(commit_result).status();
  std::cout << "Mutation group indexes [";
  for (auto index : commit_result->indexes) std::cout << " " << index;
  std::cout << " ]: ";
  if (commit_result->commit_timestamp) {
    auto const& ts = *commit_result->commit_timestamp;
    std::cout << "Committed at " << ts.get<absl::Time>().value();
  } else {
    std::cout << commit_result->commit_timestamp.status();
  }
  std::cout << "\n";
}

Langkah berikutnya