本頁面說明 Spanner 批次寫入要求,以及如何使用這些要求修改 Spanner 資料。
您可以使用 Spanner 批次寫入功能,在 Spanner 資料表中插入、更新或刪除多個資料列。Spanner 批次寫入作業支援低延遲寫入作業,且不需讀取作業,並在變異應用於批次時傳回回應。如要使用批次寫入功能,請將相關變異群組在一起,系統會以不可分割的形式提交群組中的所有變異。跨群組的變異會以未指定的順序套用,且彼此獨立 (非原子)。Spanner 不需要等待所有變異都套用完畢,才能傳送回應,這表示批次寫入可允許部分失敗。您也可以一次執行多個批次寫入作業。詳情請參閱「如何使用批次寫入」。
用途
如果您想在沒有讀取作業的情況下,提交大量寫入作業,但不需要為所有變異提交原子交易,Spanner 批次寫入功能就特別實用。
如要批次處理 DML 要求,請使用批次 DML 修改 Spanner 資料。如要進一步瞭解 DML 和變異之間的差異,請參閱「比較 DML 和變異」。
對於單一變異要求,建議您使用鎖定讀取/寫入交易。
限制
Spanner 批次寫入有下列限制:
無法使用Google Cloud 控制台或 Google Cloud CLI 執行 Spanner 批次寫入作業。這項功能僅適用於 REST 和 RPC API 以及 Spanner 用戶端程式庫。
防重播機制不支援使用批次寫入。異動可能會套用多次,而套用多次的異動可能會導致失敗。舉例來說,如果重播插入變異,可能會產生「已存在」錯誤,或是如果在變異中使用產生或提交以時間戳記為準的鍵,可能會導致額外資料列加入資料表。建議您建立寫入作業的結構使其成為冪等,以免發生這個問題。
您無法回復已完成的批次寫入要求。您可以取消進行中的批次寫入要求。如果您取消進行中的批次寫入作業,未完成群組中的變異會復原。完成群組中的變異會提交至資料庫。
批次寫入要求的大小上限與提交要求的限制相同。詳情請參閱「建立、讀取、更新及刪除資料的限制」。
如何使用批次寫入
如要使用批次寫入功能,您必須對要修改的資料庫擁有 spanner.databases.write
權限。您可以使用 REST 或 RPC API 要求呼叫,在單一呼叫中以非原子方式批次寫入變異。
使用批次寫入時,請將下列異動類型分組:
- 在父項和子項資料表中插入具有相同主鍵前置字串的資料列。
- 在資料表之間具有外鍵關聯的情況下,將資料列插入資料表。
- 其他類型的相關突變,視資料庫結構定義和應用程式邏輯而定。
您也可以使用 Spanner 用戶端程式庫進行批次寫入。以下程式碼範例會使用新資料列更新 Singers
資料表。
用戶端程式庫
Java
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.MutationGroup;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Code;
import com.google.spanner.v1.BatchWriteResponse;
public class BatchWriteAtLeastOnceSample {
/***
* Assume DDL for the underlying database:
* <pre>{@code
* CREATE TABLE Singers (
* SingerId INT64 NOT NULL,
* FirstName STRING(1024),
* LastName STRING(1024),
* ) PRIMARY KEY (SingerId)
*
* CREATE TABLE Albums (
* SingerId INT64 NOT NULL,
* AlbumId INT64 NOT NULL,
* AlbumTitle STRING(1024),
* ) PRIMARY KEY (SingerId, AlbumId),
* INTERLEAVE IN PARENT Singers ON DELETE CASCADE
* }</pre>
*/
private static final MutationGroup MUTATION_GROUP1 =
MutationGroup.of(
Mutation.newInsertOrUpdateBuilder("Singers")
.set("SingerId")
.to(16)
.set("FirstName")
.to("Scarlet")
.set("LastName")
.to("Terry")
.build());
private static final MutationGroup MUTATION_GROUP2 =
MutationGroup.of(
Mutation.newInsertOrUpdateBuilder("Singers")
.set("SingerId")
.to(17)
.set("FirstName")
.to("Marc")
.build(),
Mutation.newInsertOrUpdateBuilder("Singers")
.set("SingerId")
.to(18)
.set("FirstName")
.to("Catalina")
.set("LastName")
.to("Smith")
.build(),
Mutation.newInsertOrUpdateBuilder("Albums")
.set("SingerId")
.to(17)
.set("AlbumId")
.to(1)
.set("AlbumTitle")
.to("Total Junk")
.build(),
Mutation.newInsertOrUpdateBuilder("Albums")
.set("SingerId")
.to(18)
.set("AlbumId")
.to(2)
.set("AlbumTitle")
.to("Go, Go, Go")
.build());
static void batchWriteAtLeastOnce() {
// TODO(developer): Replace these variables before running the sample.
final String projectId = "my-project";
final String instanceId = "my-instance";
final String databaseId = "my-database";
batchWriteAtLeastOnce(projectId, instanceId, databaseId);
}
static void batchWriteAtLeastOnce(String projectId, String instanceId, String databaseId) {
try (Spanner spanner =
SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
final DatabaseClient dbClient = spanner.getDatabaseClient(dbId);
// Creates and issues a BatchWrite RPC request that will apply the mutation groups
// non-atomically and respond back with a stream of BatchWriteResponse.
ServerStream<BatchWriteResponse> responses =
dbClient.batchWriteAtLeastOnce(
ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
Options.tag("batch-write-tag"));
// Iterates through the results in the stream response and prints the MutationGroup indexes,
// commit timestamp and status.
for (BatchWriteResponse response : responses) {
if (response.getStatus().getCode() == Code.OK_VALUE) {
System.out.printf(
"Mutation group indexes %s have been applied with commit timestamp %s",
response.getIndexesList(), response.getCommitTimestamp());
} else {
System.out.printf(
"Mutation group indexes %s could not be applied with error code %s and "
+ "error message %s", response.getIndexesList(),
Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
}
}
}
}
}
Go
import (
"context"
"fmt"
"io"
"cloud.google.com/go/spanner"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"google.golang.org/grpc/status"
)
// batchWrite demonstrates writing mutations to a Spanner database through
// BatchWrite API - https://pkg.go.dev/cloud.google.com/go/spanner#Client.BatchWrite
func batchWrite(w io.Writer, db string) error {
// db := "projects/my-project/instances/my-instance/databases/my-database"
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
// Database is assumed to exist - https://cloud.google.com/spanner/docs/getting-started/go#create_a_database
singerColumns := []string{"SingerId", "FirstName", "LastName"}
albumColumns := []string{"SingerId", "AlbumId", "AlbumTitle"}
mutationGroups := make([]*spanner.MutationGroup, 2)
mutationGroup1 := []*spanner.Mutation{
spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{16, "Scarlet", "Terry"}),
}
mutationGroups[0] = &spanner.MutationGroup{Mutations: mutationGroup1}
mutationGroup2 := []*spanner.Mutation{
spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{17, "Marc", ""}),
spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{18, "Catalina", "Smith"}),
spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{17, 1, "Total Junk"}),
spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{18, 2, "Go, Go, Go"}),
}
mutationGroups[1] = &spanner.MutationGroup{Mutations: mutationGroup2}
iter := client.BatchWrite(ctx, mutationGroups)
// See https://pkg.go.dev/cloud.google.com/go/spanner#BatchWriteResponseIterator.Do
doFunc := func(response *sppb.BatchWriteResponse) error {
if err = status.ErrorProto(response.GetStatus()); err == nil {
fmt.Fprintf(w, "Mutation group indexes %v have been applied with commit timestamp %v",
response.GetIndexes(), response.GetCommitTimestamp())
} else {
fmt.Fprintf(w, "Mutation group indexes %v could not be applied with error %v",
response.GetIndexes(), err)
}
// Return an actual error as needed.
return nil
}
return iter.Do(doFunc)
}
節點
// Imports the Google Cloud client library
const {Spanner, MutationGroup} = require('@google-cloud/spanner');
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
// Create Mutation Groups
/**
* Related mutations should be placed in a group, such as insert mutations for both a parent and a child row.
* A group must contain related mutations.
* Please see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.BatchWriteRequest.MutationGroup}
* for more details and examples.
*/
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('Singers', {
SingerId: 1,
FirstName: 'Scarlet',
LastName: 'Terry',
});
const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('Singers', {
SingerId: 2,
FirstName: 'Marc',
});
mutationGroup2.insert('Singers', {
SingerId: 3,
FirstName: 'Catalina',
LastName: 'Smith',
});
mutationGroup2.insert('Albums', {
AlbumId: 1,
SingerId: 2,
AlbumTitle: 'Total Junk',
});
mutationGroup2.insert('Albums', {
AlbumId: 2,
SingerId: 3,
AlbumTitle: 'Go, Go, Go',
});
const options = {
transactionTag: 'batch-write-tag',
};
try {
database
.batchWriteAtLeastOnce([mutationGroup1, mutationGroup2], options)
.on('error', console.error)
.on('data', response => {
// Check the response code of each response to determine whether the mutation group(s) were applied successfully.
if (response.status.code === 0) {
console.log(
`Mutation group indexes ${
response.indexes
}, have been applied with commit timestamp ${Spanner.timestamp(
response.commitTimestamp,
).toJSON()}`,
);
}
// Mutation groups that fail to commit trigger a response with a non-zero status code.
else {
console.log(
`Mutation group indexes ${response.indexes}, could not be applied with error code ${response.status.code}, and error message ${response.status.message}`,
);
}
})
.on('end', () => {
console.log('Request completed successfully');
});
} catch (err) {
console.log(err);
}
Python
def batch_write(instance_id, database_id):
"""Inserts sample data into the given database via BatchWrite API.
The database and table must already exist and can be created using
`create_database`.
"""
from google.rpc.code_pb2 import OK
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
with database.mutation_groups() as groups:
group1 = groups.group()
group1.insert_or_update(
table="Singers",
columns=("SingerId", "FirstName", "LastName"),
values=[
(16, "Scarlet", "Terry"),
],
)
group2 = groups.group()
group2.insert_or_update(
table="Singers",
columns=("SingerId", "FirstName", "LastName"),
values=[
(17, "Marc", ""),
(18, "Catalina", "Smith"),
],
)
group2.insert_or_update(
table="Albums",
columns=("SingerId", "AlbumId", "AlbumTitle"),
values=[
(17, 1, "Total Junk"),
(18, 2, "Go, Go, Go"),
],
)
for response in groups.batch_write():
if response.status.code == OK:
print(
"Mutation group indexes {} have been applied with commit timestamp {}".format(
response.indexes, response.commit_timestamp
)
)
else:
print(
"Mutation group indexes {} could not be applied with error {}".format(
response.indexes, response.status
)
)
C++
namespace spanner = ::google::cloud::spanner;
// Use upserts as mutation groups are not replay protected.
auto commit_results = client.CommitAtLeastOnce({
// group #0
spanner::Mutations{
spanner::InsertOrUpdateMutationBuilder(
"Singers", {"SingerId", "FirstName", "LastName"})
.EmplaceRow(16, "Scarlet", "Terry")
.Build(),
},
// group #1
spanner::Mutations{
spanner::InsertOrUpdateMutationBuilder(
"Singers", {"SingerId", "FirstName", "LastName"})
.EmplaceRow(17, "Marc", "")
.EmplaceRow(18, "Catalina", "Smith")
.Build(),
spanner::InsertOrUpdateMutationBuilder(
"Albums", {"SingerId", "AlbumId", "AlbumTitle"})
.EmplaceRow(17, 1, "Total Junk")
.EmplaceRow(18, 2, "Go, Go, Go")
.Build(),
},
});
for (auto& commit_result : commit_results) {
if (!commit_result) throw std::move(commit_result).status();
std::cout << "Mutation group indexes [";
for (auto index : commit_result->indexes) std::cout << " " << index;
std::cout << " ]: ";
if (commit_result->commit_timestamp) {
auto const& ts = *commit_result->commit_timestamp;
std::cout << "Committed at " << ts.get<absl::Time>().value();
} else {
std::cout << commit_result->commit_timestamp.status();
}
std::cout << "\n";
}
後續步驟
- 進一步瞭解 Spanner 交易。