トランザクションの概要

このページでは、Spanner のトランザクションについて説明し、Spanner のトランザクション(読み取り / 書き込み、読み取り専用、パーティション化された DML)のインターフェースについて紹介します。

Spanner 内のトランザクションは、データベース内の列、行、およびテーブルにまたがる時間内の単一論理ポイントにおいてアトミックに実行される読み取りと書き込みのセットです。

セッションは、Spanner データベースでトランザクションを実行するために使用されます。セッションとは、Spanner データベース サービスとの論理的な通信チャネルを表します。セッションで一度に実行されるトランザクションは 1 つまたは複数です。詳細については、セッションをご覧ください。

トランザクション タイプ

Spanner は、次のトランザクション タイプをサポートしています。各トランザクション タイプは、特定のデータ操作パターン用にデザインされています。

  • 読み取り / 書き込み: これらのトランザクションは悲観的ロックを使用し、必要に応じて 2 フェーズ commit を行います。失敗して再試行が必要になる場合があります。単一のデータベースに限定されますが、そのデータベース内の複数のテーブルにわたってデータを変更できます。

  • 読み取り専用: これらのトランザクションは、複数の読み取りオペレーションにわたってデータの整合性を保証しますが、データの変更は許可しません。整合性を保つためにシステムが決定したタイムスタンプで実行されるか、ユーザーが構成した過去のタイムスタンプで実行されます。読み取り / 書き込みトランザクションとは異なり、commit オペレーションやロックは必要ありませんが、進行中の書き込みオペレーションが完了するまで一時停止することがあります。

  • パーティション化 DML: このトランザクション タイプでは、DML ステートメントをパーティション化 DML オペレーションとして実行します。データ クリーンアップや一括データ挿入など、大規模なデータの更新や削除に最適化されています。アトミック トランザクションを必要としない多数の書き込みには、バッチ書き込みの使用を検討してください。詳細については、バッチ書き込みを使用してデータを変更するをご覧ください。

読み取り / 書き込みトランザクション

ロック読み取り / 書き込みトランザクションを使用して、データベース内の任意の場所でデータをアトミックに読み取り、変更、書き込みします。この型のトランザクションは外部一貫性を保ちます。

トランザクションがアクティブな時間を最小限に抑えます。トランザクションの期間を短くすると、commit が成功する確率が高くなり、競合が減少します。Spanner は、トランザクションが読み取りを継続し、sessions.commit または sessions.rollback オペレーションでトランザクションが終了していない限り、読み取りロックをアクティブに保とうとします。クライアントが長期間非アクティブな状態が続くと、Spanner はトランザクションのロックを解除してトランザクションを中止することがあります。

概念的には、読み取り / 書き込みトランザクションは、0 回以上の読み取りまたは SQL ステートメントの後に sessions.commit が続く形で構成されます。sessions.commit の前であれば、クライアントは sessions.rollback リクエストを送信してトランザクションを中止できます。

1 つ以上の読み取りオペレーションに依存する書き込みオペレーションを実行するには、ロック読み取り / 書き込みトランザクションを使用します。

  • 1 つ以上の書き込みオペレーションをアトミックに commit する必要がある場合は、同じ読み取り / 書き込みトランザクション内で書き込みを実行します。たとえば、口座 A から口座 B に $200 を振り込む場合は、両方の書き込みオペレーション(口座 A から $200 を減らし、口座 B に $200 を増やす)と、初期の口座残高の読み取りを同じトランザクション内で行います。
  • 口座 A の残高を倍にする場合は、同じトランザクション内で読み取りと書き込みのオペレーションを実行します。これにより、システムが残高を読み取ってから、残高を 2 倍にして更新します。
  • 1 つ以上の読み取りオペレーションの結果に依存する 1 つ以上の書き込みオペレーションを実行する可能性がある場合は、書き込みオペレーションが実行されない場合でも、これらの書き込みと読み取りを同じ読み取り / 書き込みトランザクションで実行します。たとえば、口座 A の現在の残高が $500 より多い場合にのみ、口座 A から口座 B に $200 を振り込む場合は、振込が行われなくても、口座 A の残高の読み取りと条件付き書き込みオペレーションを同じトランザクション内に含めます。

読み取りオペレーションを実行するには、単一読み取りメソッドまたは読み取り専用トランザクションを使用します。

  • 読み取りオペレーションのみを実行し、単一読み取りメソッドを使用して読み取りオペレーションを表現できる場合は、その単一読み取りメソッドまたは読み取り専用トランザクションを使用します。読み取り / 書き込みトランザクションとは異なり、単一読み取りはロックを取得しません。

インターフェース

Spanner クライアント ライブラリには、トランザクションが中止した場合は再試行をしながら、読み取り / 書き込みトランザクション内で一連の作業を実行するためのインターフェースが用意されています。Spanner トランザクションでは、commit する前に複数回の再試行が必要になることがあります。

トランザクションが中止される原因はいくつかあります。たとえば、2 つのトランザクションが同時にデータを変更しようとすると、デッドロックが発生する可能性があります。このような場合、Spanner は一方のトランザクションを中止して、もう一方のトランザクションを続行させます。まれに、Spanner 内の一時的なイベントがトランザクションの中止を引き起こすこともあります。

トランザクションはアトミックなので、中止されたトランザクションはデータベースに影響しません。同じセッション内でトランザクションを再試行して、成功率を高めます。ABORTED エラーが発生するたびに、トランザクションのロック優先度が上がります。

Spanner クライアント ライブラリでトランザクションを使用するときは、トランザクションの本体を関数オブジェクトとして定義します。この関数は、1 つ以上のデータベース テーブルに対して実行される読み取りと書き込みをカプセル化します。Spanner クライアント ライブラリは、トランザクションが正常に commit されるか、再試行できないエラーが発生するまで、この関数を繰り返し実行します。

Albums テーブルMarketingBudget 列があるとします。

CREATE TABLE Albums (
  SingerId        INT64 NOT NULL,
  AlbumId         INT64 NOT NULL,
  AlbumTitle      STRING(MAX),
  MarketingBudget INT64
) PRIMARY KEY (SingerId, AlbumId);

マーケティング部門から、Albums (2, 2) の予算から Albums (1, 1) に $200,000 を移す(ただし、そのアルバムの予算からその金額を使用できる場合に限る)ように依頼があったとします。このオペレーションでは、トランザクションは、読み取りの結果により書き込みを行う可能性があるので、ロック読み取り / 書き込みトランザクションを使用する必要があります。

以下に示すのは、読み書きトランザクションを実行する方法です。

C++

void ReadWriteTransaction(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  using ::google::cloud::StatusOr;

  // A helper to read a single album MarketingBudget.
  auto get_current_budget =
      [](spanner::Client client, spanner::Transaction txn,
         std::int64_t singer_id,
         std::int64_t album_id) -> StatusOr<std::int64_t> {
    auto key = spanner::KeySet().AddKey(spanner::MakeKey(singer_id, album_id));
    auto rows = client.Read(std::move(txn), "Albums", std::move(key),
                            {"MarketingBudget"});
    using RowType = std::tuple<std::int64_t>;
    auto row = spanner::GetSingularRow(spanner::StreamOf<RowType>(rows));
    if (!row) return std::move(row).status();
    return std::get<0>(*std::move(row));
  };

  auto commit = client.Commit(
      [&client, &get_current_budget](
          spanner::Transaction const& txn) -> StatusOr<spanner::Mutations> {
        auto b1 = get_current_budget(client, txn, 1, 1);
        if (!b1) return std::move(b1).status();
        auto b2 = get_current_budget(client, txn, 2, 2);
        if (!b2) return std::move(b2).status();
        std::int64_t transfer_amount = 200000;

        return spanner::Mutations{
            spanner::UpdateMutationBuilder(
                "Albums", {"SingerId", "AlbumId", "MarketingBudget"})
                .EmplaceRow(1, 1, *b1 + transfer_amount)
                .EmplaceRow(2, 2, *b2 - transfer_amount)
                .Build()};
      });

  if (!commit) throw std::move(commit).status();
  std::cout << "Transfer was successful [spanner_read_write_transaction]\n";
}

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;
using System.Transactions;

public class ReadWriteWithTransactionAsyncSample
{
    public async Task<int> ReadWriteWithTransactionAsync(string projectId, string instanceId, string databaseId)
    {
        // This sample transfers 200,000 from the MarketingBudget
        // field of the second Album to the first Album. Make sure to run
        // the Add Column and Write Data To New Column samples first,
        // in that order.

        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        decimal transferAmount = 200000;
        decimal secondBudget = 0;
        decimal firstBudget = 0;

        using var connection = new SpannerConnection(connectionString);
        using var cmdLookup1 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 2 AND AlbumId = 2");

        using (var reader = await cmdLookup1.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                // Read the second album's budget.
                secondBudget = reader.GetFieldValue<decimal>("MarketingBudget");
                // Confirm second Album's budget is sufficient and
                // if not raise an exception. Raising an exception
                // will automatically roll back the transaction.
                if (secondBudget < transferAmount)
                {
                    throw new Exception($"The second album's budget {secondBudget} is less than the amount to transfer.");
                }
            }
        }

        // Read the first album's budget.
        using var cmdLookup2 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 1 and AlbumId = 1");
        using (var reader = await cmdLookup2.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                firstBudget = reader.GetFieldValue<decimal>("MarketingBudget");
            }
        }

        // Specify update command parameters.
        using var cmdUpdate = connection.CreateUpdateCommand("Albums", new SpannerParameterCollection
        {
            { "SingerId", SpannerDbType.Int64 },
            { "AlbumId", SpannerDbType.Int64 },
            { "MarketingBudget", SpannerDbType.Int64 },
        });

        // Update second album to remove the transfer amount.
        secondBudget -= transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 2;
        cmdUpdate.Parameters["AlbumId"].Value = 2;
        cmdUpdate.Parameters["MarketingBudget"].Value = secondBudget;
        var rowCount = await cmdUpdate.ExecuteNonQueryAsync();

        // Update first album to add the transfer amount.
        firstBudget += transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 1;
        cmdUpdate.Parameters["AlbumId"].Value = 1;
        cmdUpdate.Parameters["MarketingBudget"].Value = firstBudget;
        rowCount += await cmdUpdate.ExecuteNonQueryAsync();
        scope.Complete();
        Console.WriteLine("Transaction complete.");
        return rowCount;
    }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func writeWithTransaction(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		getBudget := func(key spanner.Key) (int64, error) {
			row, err := txn.ReadRow(ctx, "Albums", key, []string{"MarketingBudget"})
			if err != nil {
				return 0, err
			}
			var budget int64
			if err := row.Column(0, &budget); err != nil {
				return 0, err
			}
			return budget, nil
		}
		album2Budget, err := getBudget(spanner.Key{2, 2})
		if err != nil {
			return err
		}
		const transferAmt = 200000
		if album2Budget >= transferAmt {
			album1Budget, err := getBudget(spanner.Key{1, 1})
			if err != nil {
				return err
			}
			album1Budget += transferAmt
			album2Budget -= transferAmt
			cols := []string{"SingerId", "AlbumId", "MarketingBudget"}
			txn.BufferWrite([]*spanner.Mutation{
				spanner.Update("Albums", cols, []interface{}{1, 1, album1Budget}),
				spanner.Update("Albums", cols, []interface{}{2, 2, album2Budget}),
			})
			fmt.Fprintf(w, "Moved %d from Album2's MarketingBudget to Album1's.", transferAmt)
		}
		return nil
	})
	return err
}

Java

static void writeWithTransaction(DatabaseClient dbClient) {
  dbClient
      .readWriteTransaction()
      .run(transaction -> {
        // Transfer marketing budget from one album to another. We do it in a transaction to
        // ensure that the transfer is atomic.
        Struct row =
            transaction.readRow("Albums", Key.of(2, 2), Arrays.asList("MarketingBudget"));
        long album2Budget = row.getLong(0);
        // Transaction will only be committed if this condition still holds at the time of
        // commit. Otherwise it will be aborted and the callable will be rerun by the
        // client library.
        long transfer = 200000;
        if (album2Budget >= transfer) {
          long album1Budget =
              transaction
                  .readRow("Albums", Key.of(1, 1), Arrays.asList("MarketingBudget"))
                  .getLong(0);
          album1Budget += transfer;
          album2Budget -= transfer;
          transaction.buffer(
              Mutation.newUpdateBuilder("Albums")
                  .set("SingerId")
                  .to(1)
                  .set("AlbumId")
                  .to(1)
                  .set("MarketingBudget")
                  .to(album1Budget)
                  .build());
          transaction.buffer(
              Mutation.newUpdateBuilder("Albums")
                  .set("SingerId")
                  .to(2)
                  .set("AlbumId")
                  .to(2)
                  .set("MarketingBudget")
                  .to(album2Budget)
                  .build());
        }
        return null;
      });
}

Node.js

// This sample transfers 200,000 from the MarketingBudget field
// of the second Album to the first Album, as long as the second
// Album has enough money in its budget. Make sure to run the
// addColumn and updateData samples first (in that order).

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const transferAmount = 200000;

// Note: the `runTransaction()` method is non blocking and returns "void".
// For sequential execution of the transaction use `runTransactionAsync()` method which returns a promise.
// For example: await database.runTransactionAsync(async (err, transaction) => { ... })
database.runTransaction(async (err, transaction) => {
  if (err) {
    console.error(err);
    return;
  }
  let firstBudget, secondBudget;
  const queryOne = {
    columns: ['MarketingBudget'],
    keys: [[2, 2]], // SingerId: 2, AlbumId: 2
  };

  const queryTwo = {
    columns: ['MarketingBudget'],
    keys: [[1, 1]], // SingerId: 1, AlbumId: 1
  };

  Promise.all([
    // Reads the second album's budget
    transaction.read('Albums', queryOne).then(results => {
      // Gets second album's budget
      const rows = results[0].map(row => row.toJSON());
      secondBudget = rows[0].MarketingBudget;
      console.log(`The second album's marketing budget: ${secondBudget}`);

      // Makes sure the second album's budget is large enough
      if (secondBudget < transferAmount) {
        throw new Error(
          `The second album's budget (${secondBudget}) is less than the transfer amount (${transferAmount}).`,
        );
      }
    }),

    // Reads the first album's budget
    transaction.read('Albums', queryTwo).then(results => {
      // Gets first album's budget
      const rows = results[0].map(row => row.toJSON());
      firstBudget = rows[0].MarketingBudget;
      console.log(`The first album's marketing budget: ${firstBudget}`);
    }),
  ])
    .then(() => {
      console.log(firstBudget, secondBudget);
      // Transfers the budgets between the albums
      firstBudget += transferAmount;
      secondBudget -= transferAmount;

      console.log(firstBudget, secondBudget);

      // Updates the database
      // Note: Cloud Spanner interprets Node.js numbers as FLOAT64s, so they
      // must be converted (back) to strings before being inserted as INT64s.
      transaction.update('Albums', [
        {
          SingerId: '1',
          AlbumId: '1',
          MarketingBudget: firstBudget.toString(),
        },
        {
          SingerId: '2',
          AlbumId: '2',
          MarketingBudget: secondBudget.toString(),
        },
      ]);
    })
    .then(() => {
      // Commits the transaction and send the changes to the database
      return transaction.commit();
    })
    .then(() => {
      console.log(
        `Successfully executed read-write transaction to transfer ${transferAmount} from Album 2 to Album 1.`,
      );
    })
    .catch(err => {
      console.error('ERROR:', err);
    })
    .then(() => {
      transaction.end();
      // Closes the database when finished
      return database.close();
    });
});

PHP

use Google\Cloud\Spanner\SpannerClient;
use Google\Cloud\Spanner\Transaction;
use UnexpectedValueException;

/**
 * Performs a read-write transaction to update two sample records in the
 * database.
 *
 * This will transfer 200,000 from the `MarketingBudget` field for the second
 * Album to the first Album. If the `MarketingBudget` for the second Album is
 * too low, it will raise an exception.
 *
 * Before running this sample, you will need to run the `update_data` sample
 * to populate the fields.
 * Example:
 * ```
 * read_write_transaction($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_write_transaction(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $database->runTransaction(function (Transaction $t) use ($spanner) {
        $transferAmount = 200000;

        // Read the second album's budget.
        $secondAlbumKey = [2, 2];
        $secondAlbumKeySet = $spanner->keySet(['keys' => [$secondAlbumKey]]);
        $secondAlbumResult = $t->read(
            'Albums',
            $secondAlbumKeySet,
            ['MarketingBudget'],
            ['limit' => 1]
        );

        $firstRow = $secondAlbumResult->rows()->current();
        $secondAlbumBudget = $firstRow['MarketingBudget'];
        if ($secondAlbumBudget < $transferAmount) {
            // Throwing an exception will automatically roll back the transaction.
            throw new UnexpectedValueException(
                'The second album\'s budget is lower than the transfer amount: ' . $transferAmount
            );
        }

        $firstAlbumKey = [1, 1];
        $firstAlbumKeySet = $spanner->keySet(['keys' => [$firstAlbumKey]]);
        $firstAlbumResult = $t->read(
            'Albums',
            $firstAlbumKeySet,
            ['MarketingBudget'],
            ['limit' => 1]
        );

        // Read the first album's budget.
        $firstRow = $firstAlbumResult->rows()->current();
        $firstAlbumBudget = $firstRow['MarketingBudget'];

        // Update the budgets.
        $secondAlbumBudget -= $transferAmount;
        $firstAlbumBudget += $transferAmount;
        printf('Setting first album\'s budget to %s and the second album\'s ' .
            'budget to %s.' . PHP_EOL, $firstAlbumBudget, $secondAlbumBudget);

        // Update the rows.
        $t->updateBatch('Albums', [
            ['SingerId' => 1, 'AlbumId' => 1, 'MarketingBudget' => $firstAlbumBudget],
            ['SingerId' => 2, 'AlbumId' => 2, 'MarketingBudget' => $secondAlbumBudget],
        ]);

        // Commit the transaction!
        $t->commit();

        print('Transaction complete.' . PHP_EOL);
    });
}

Python

def read_write_transaction(instance_id, database_id):
    """Performs a read-write transaction to update two sample records in the
    database.

    This will transfer 200,000 from the `MarketingBudget` field for the second
    Album to the first Album. If the `MarketingBudget` is too low, it will
    raise an exception.

    Before running this sample, you will need to run the `update_data` sample
    to populate the fields.
    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    def update_albums(transaction):
        # Read the second album budget.
        second_album_keyset = spanner.KeySet(keys=[(2, 2)])
        second_album_result = transaction.read(
            table="Albums",
            columns=("MarketingBudget",),
            keyset=second_album_keyset,
            limit=1,
        )
        second_album_row = list(second_album_result)[0]
        second_album_budget = second_album_row[0]

        transfer_amount = 200000

        if second_album_budget < transfer_amount:
            # Raising an exception will automatically roll back the
            # transaction.
            raise ValueError("The second album doesn't have enough funds to transfer")

        # Read the first album's budget.
        first_album_keyset = spanner.KeySet(keys=[(1, 1)])
        first_album_result = transaction.read(
            table="Albums",
            columns=("MarketingBudget",),
            keyset=first_album_keyset,
            limit=1,
        )
        first_album_row = list(first_album_result)[0]
        first_album_budget = first_album_row[0]

        # Update the budgets.
        second_album_budget -= transfer_amount
        first_album_budget += transfer_amount
        print(
            "Setting first album's budget to {} and the second album's "
            "budget to {}.".format(first_album_budget, second_album_budget)
        )

        # Update the rows.
        transaction.update(
            table="Albums",
            columns=("SingerId", "AlbumId", "MarketingBudget"),
            values=[(1, 1, first_album_budget), (2, 2, second_album_budget)],
        )

    database.run_in_transaction(update_albums)

    print("Transaction complete.")

Ruby

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner         = Google::Cloud::Spanner.new project: project_id
client          = spanner.client instance_id, database_id
transfer_amount = 200_000

client.transaction do |transaction|
  first_album  = transaction.read("Albums", [:MarketingBudget], keys: [[1, 1]]).rows.first
  second_album = transaction.read("Albums", [:MarketingBudget], keys: [[2, 2]]).rows.first

  raise "The second album does not have enough funds to transfer" if second_album[:MarketingBudget] < transfer_amount

  new_first_album_budget  = first_album[:MarketingBudget] + transfer_amount
  new_second_album_budget = second_album[:MarketingBudget] - transfer_amount

  transaction.update "Albums", [
    { SingerId: 1, AlbumId: 1, MarketingBudget: new_first_album_budget  },
    { SingerId: 2, AlbumId: 2, MarketingBudget: new_second_album_budget }
  ]
end

puts "Transaction complete"

セマンティクス

このセクションでは、Spanner の読み取り / 書き込みトランザクションのセマンティクスについて説明します。

プロパティ

Spanner 内の読み取り / 書き込みトランザクションは、一連の読み取りと書き込みをアトミックに実行します。読み取り / 書き込みトランザクションが実行されるタイムスタンプは経過時間と一致します。シリアル化の順序は、このタイムスタンプの順序と一致します。

読み取り / 書き込みトランザクションには、リレーショナル データベースの ACID プロパティが用意されています。Spanner の読み取り / 書き込みトランザクションは、一般的な ACID よりも強力なプロパティを備えています。

これらのプロパティのため、アプリケーションのデベロッパーは、同時に実行される可能性がある他のトランザクションからその実行を保護することに煩わされることなく、独自に各トランザクションの正確さのみにフォーカスできます。

読み取り / 書き込みトランザクションの分離

一連の読み取りと書き込みを含むトランザクションが正常に commit されると、次のようになります。

  • トランザクションは、トランザクションの commit タイムスタンプで取得された一貫性のあるスナップショットを反映する値を返します。
  • commit 時に空の行または範囲が空のままになっている。
  • トランザクションは、トランザクションの commit タイムスタンプで、すべての書き込みを commit します。
  • トランザクションが commit されるまで、どのトランザクションも書き込みを認識できません。

Spanner クライアント ドライバには、トランザクションを再実行し、クライアントが観測したデータを検証することで一時的なエラーをマスクするトランザクションの再試行ロジックが含まれています。

トランザクション自体の視点からも、Spanner データベースの他のリーダーおよびライターの視点からも、すべての読み取りおよび書き込みが単一の時点で発生しているように見えるという効果があります。 つまり、読み取りと書き込みは同じタイムスタンプで行われます。例については、直列化可能性と外部整合性をご覧ください。

読み取りトランザクションの分離

読み取り / 書き込みトランザクションが読み取りオペレーションのみを実行する場合、読み取り専用トランザクションと同様の一貫性保証が提供されます。トランザクション内のすべての読み取りは、存在しない行の確認を含め、一貫性のあるタイムスタンプからデータを返します。

異なるのは、書き込みオペレーションを実行せずに読み取り / 書き込みトランザクションが commit される場合です。このシナリオでは、トランザクション内で読み取られたデータが、読み取りオペレーションとトランザクションの commit の間でデータベース内で変更されていないことは保証されません。

データの鮮度を確保し、最後に取得されてからデータが変更されていないことを検証するには、後続の読み取りが必要です。この再読み取りは、別の読み取り / 書き込みトランザクション内または強力な読み取りで実行できます。

効率を最大限に高めるため、トランザクションが読み取りのみを実行する場合は、読み取り / 書き込みトランザクションではなく読み取り専用トランザクションを使用します。

アトミック性、整合性、耐久性

分離に加えて、Spanner には他の ACID プロパティの保証があります。

  • アトミック性: トランザクション内のすべてのオペレーションが正常に完了するか、またはどのオペレーションも実行されない場合、そのトランザクションはアトミックと見なされます。トランザクション内のオペレーションが失敗すると、トランザクション全体が元の状態にロールバックされ、データの完全性が確保されます。
  • 整合性: トランザクションは、データベースのルールと制約の完全性を維持する必要があります。トランザクションが完了すると、データベースは事前定義されたルールに準拠した有効な状態になります。
  • 耐久性: トランザクションが commit されると、その変更はデータベースに永続的に保存され、システム障害、停電、その他の障害が発生した場合でも保持されます。

直列化可能性と外部整合性

Spanner には、直列化可能性外部整合性など、強力なトランザクション保証が用意されています。これらのプロパティにより、分散環境でもデータの一貫性が維持され、操作が予測可能な順序で実行されます。

直列化可能性により、トランザクションが同時に処理されても、すべてが単一の順序で連続して実行されるように見えます。Spanner は、commit された順序を反映した commit タイムスタンプをトランザクションに割り当てることで、これを実現します。

Spanner には、外部整合性と呼ばれるさらに強力な保証があります。つまり、トランザクションは commit タイムスタンプに反映された順序で commit されるだけでなく、これらのタイムスタンプは実際の時間とも一致します。これにより、commit タイムスタンプをリアルタイムで比較し、データの一貫性のあるグローバルな順序付けられたビューを提供できます。

つまり、トランザクション Txn1 が別のトランザクション Txn2 より前にリアルタイムで commit されると、Txn1 の commit タイムスタンプは Txn2 の commit タイムスタンプより前になります。

次に例を示します。

同じデータを読み取る 2 つのトランザクションの実行を示しているタイムライン

このシナリオでは、タイムライン t の間:

  • トランザクション Txn1 は、データ A を読み取り、A への書き込みをステージングして、正常に commit します。
  • トランザクション Txn2Txn1 の開始後に開始します。データ B を読み取り、次にデータ A を読み取ります。

Txn2 は Txn1 の完了前に開始されましたが、Txn2Txn1A に行った変更を認識します。これは、Txn1A への書き込みを commit した後に Txn2A を読み取るためです。

Txn1Txn2 の実行時間は重複する可能性がありますが、それぞれの commit タイムスタンプである c1c2 によって、トランザクションの線形順序が適用されます。具体的には、次のようになります。

  • Txn1 内のすべての読み取りと書き込みが、単一の時点 c1 で発生したように見える。
  • Txn2 内のすべての読み取りと書き込みが、単一の時点 c2 で発生したように見える。
  • 重要なのは、書き込みが異なるマシンで発生した場合でも、commit された書き込みの c1c2 よりも前になることです。Txn2 が読み取りのみを実行する場合、c1c2 より前または同時です。

この強い順序付けにより、後続の読み取りオペレーションで Txn2 の影響が確認された場合、Txn1 の影響も確認されます。このプロパティは、正常に commit されたすべてのトランザクションに対して true を保持します。

トランザクション失敗時の読み取りと書き込みの保証

トランザクション実行の呼び出しが失敗した場合、読み取りと書き込みの保証は、基礎となるコミット呼び出しがどのようなエラーで失敗したかによって異なります。

たとえば、「行が見つかりません」、「行はすでに存在します」などのエラーは、バッファリングされたミューテーションの書き込みで、クライアントがアップデートを試みた行が存在しないなど、なんらかのエラーに遭遇したことを意味します。このような場合、読み取りは整合性があることが保証され、書き込みは該当せず、行の不在は読み取りと整合性があることが保証されます。

トランザクション失敗時の読み取りと書き込みの保証

Spanner トランザクションが失敗した場合、読み取りと書き込みに対して受け取る保証は、commit オペレーション中に発生した特定のエラーによって異なります。

たとえば、「行が見つかりません」、「行はすでに存在します」などのエラー メッセージは、バッファリングされたミューテーションの書き込み中に問題が発生したことを示します。これは、たとえば、クライアントが更新しようとしている行が存在しない場合に発生する可能性があります。これらのシナリオでは:

  • 読み取りが一貫している: トランザクション中に読み取られたデータは、エラーが発生した時点まで一貫性が保証されます。
  • 書き込みが適用されない: トランザクションが試みたミューテーションはデータベースに commit されません。
  • 行の整合性: エラーをトリガーした行の非存在(または存在状態)は、トランザクション内で実行された読み取りと一致しています。

Spanner では、同じトランザクション内の他の進行中のオペレーションに影響を与えることなく、非同期読み取りオペレーションをいつでもキャンセルできます。この柔軟性は、上位レベルのオペレーションがキャンセルされた場合や、初期結果に基づいて読み取りを中止する場合に便利です。

ただし、読み取りのキャンセルをリクエストしても、すぐに終了するとは限りません。キャンセル リクエストの後でも、読み取りオペレーションは次のようになる可能性があります。

  • 正常に完了: キャンセルが有効になる前に、読み取りが処理を完了して結果を返すことがあります。
  • 別の理由で失敗: 読み取りが中止などの別のエラーで終了する可能性があります。
  • 不完全な結果を返す: 読み取りで部分的な結果が返されることがあります。この結果は、トランザクションの commit プロセスの一部として検証されます。

トランザクションの commit オペレーションとの違いにも注意してください。トランザクションがすでに commit されているか、別の理由で失敗している場合を除き、commit をキャンセルするとトランザクション全体が中止されます。

パフォーマンス

このセクションでは、読み取り / 書き込みトランザクションのパフォーマンスに影響する問題について説明します。

ロック同時実行制御

Spanner では、複数のクライアントが同時に同じデータベースと対話できます。これらの同時実行トランザクション間でデータの整合性を維持するために、Spanner には共有ロックと独占的ロックの両方を使用するロック メカニズムがあります。

トランザクションが読み取りオペレーションを実行すると、Spanner は関連するデータに対して共有読み取りロックを取得します。これらの共有ロックにより、他の同時読み取りオペレーションが同じデータにアクセスできます。この同時実行は、トランザクションが変更の commit を準備するまで維持されます。

commit フェーズでは、書き込みが適用されると、トランザクションはロックを独占的ロックにアップグレードしようとします。このため、次の処理を行います。

  • 影響を受けるデータに対する新しい共有読み取りロック リクエストをブロックします。
  • そのデータに対する既存の共有読み取りロックがすべて解除されるまで待機します。
  • すべての共有読み取りロックが解除されると、独占的ロックが設定され、書き込みの期間中、データへの排他アクセスが許可されます。

ロックについての注意事項:

  • 粒度: Spanner は、行と列の粒度でロックを適用します。つまり、トランザクション T1 が行 albumid の列 A のロックを保持している場合でも、トランザクション T2 は競合することなく同じ行 albumid の列 B に同時に書き込むことができます。
  • 読み取りなしの書き込み: 読み取りなしの書き込みの場合、Spanner は独占的ロックを必要としません。代わりに、書き込み共有ロックを使用します。これは、読み取りのない書き込みの適用順序が commit タイムスタンプによって決定されるため、複数の書き込み元が競合することなく同じアイテムを同時に操作できるためです。独占的ロックは、トランザクションが最初に書き込むデータを読み取る場合にのみ必要です。
  • 行検索のセカンダリ インデックス: 読み取り / 書き込みトランザクション内で行検索を実行する場合は、セカンダリ インデックスを使用するとパフォーマンスが大幅に向上します。セカンダリ インデックスを使用してスキャンされる行の範囲を制限することで、Spanner はテーブル内のロックされる行数を減らし、特定の範囲外の行を同時に変更できるようにします。
  • 外部リソースへの独占的アクセス: Spanner の内部ロックは、Spanner データベース内のデータ整合性を確保するように設計されています。Spanner 以外のリソースへの独占的なアクセスを保証するために使用しないでください。Spanner は、コンピューティング リソース間のデータ移動などの内部システム最適化を含め、さまざまな理由でトランザクションを中止できます。トランザクションが再試行された場合(アプリケーション コードによって明示的に試行されたか、Spanner JDBC ドライバなどのクライアント ライブラリによって暗黙に試行されたかにかかわらず)、ロックが実施されたことが保証されるのは、commit の試行が成功した間のみです。
  • ロックの統計情報: データベース内のロックの競合を診断して調査するには、ロックの統計情報のイントロスペクション ツールを使用します。

デッドロックの検出

Spanner は、複数のトランザクションがデッドロックに陥る可能性を検出し、1 つを除くすべてのトランザクションを強制的に中止します。次のシナリオについて考えてみましょう。Txn1 はレコード A のロックを保持しており、レコード B がロックできる状態になるのを待っています。一方、Txn2 はレコード B のロックを保持しており、レコード A がロックできる状態になるのを待っています。この問題を解決するには、一方のトランザクションを中止し、そのロックを解放して、もう一方のトランザクションの処理を行う必要があります。

Spanner は、デッドロックの検出に標準の wound-wait アルゴリズムを使用します。Spanner は、競合するロックをリクエストする各トランザクションの経過時間を内部で追跡します。これにより、古いトランザクションが新しいトランザクションを中止できます。古いトランザクションとは、最も早い読み取り、クエリ、commit がより早く発生したトランザクションです。

Spanner は、古いトランザクションを優先することで、すべてのトランザクションが、時間の経過に伴い、他のトランザクションよりも古くなって優先順位が高くなることで、最終的にロックを取得できるようにしています。たとえば、ライター共有ロックを必要とする古いトランザクションは、リーダー共有ロックを保持している新しいトランザクションを中止できます。

分散実行

Spanner は、複数のサーバーにまたがるデータに対してトランザクションを実行できますが、この機能を使用すると、単一サーバー トランザクションと比較してパフォーマンスが低下します。

分散できるトランザクションのタイプSpanner は、データベース行の責任を多くのサーバーに分散できます。通常、行とそれに対応するインターリーブ テーブルの行は、近隣のキーを持つ同じテーブル内の 2 つの行として、同じサーバーによって処理されます。Spanner は、さまざまなサーバー上の行にまたがるトランザクションを実行できます。ただし、一般的に、同じ場所に配置された多くの行に影響するトランザクションは、データベース全体または大きなテーブル全体に散在する多くの行に影響するトランザクションよりも高速かつ低コストです。

Spanner で最も効率的なトランザクションには、アトミックに適用する必要がある読み取りと書き込みのみが含まれます。すべての読み取りと書き込みがキースペースの同じ部分にあるデータにアクセスする場合に、トランザクションは最も速くなります。

読み取り専用トランザクション

Spanner は、ロック型読み書きトランザクションに加えて、読み取り専用トランザクションも備えています。

同一タイムスタンプで複数の読み取りを実行する必要がある場合、読み取り専用トランザクションを使用してください。読み取りを Spanner の単一読み取りメソッドで表現できる場合、代わりにその単一読み取りメソッドを使用する必要があります。このような単一読み取りメソッドを使用した場合のパフォーマンスは、読み取り専用トランザクションで実行される単一の読み取りのパフォーマンスに匹敵する必要があります。

大量のデータを読み取る場合は、パーティションを使用してデータを並列で読み取ることを検討してください。

読み取り専用トランザクションは書き込まないので、ロックを保持することはなく、他のトランザクションをブロックすることもありません。読み取り専用トランザクションはトランザクションの commit 履歴の整合性のあるプレフィックスを監視しているので、アプリケーションは常に整合性のあるデータを取得できます。

インターフェース

Spanner は、トランザクションが中止した場合は再試行をしながら、読み取り専用トランザクションのコンテキストにおける一連の作業を実行するためのインターフェースを提供します。

以下は、読み取り専用トランザクションを使用して、同一のタイムスタンプの 2 つの読み取りで整合性のあるデータを取得する方法を示しています。

C++

void ReadOnlyTransaction(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto read_only = spanner::MakeReadOnlyTransaction();

  spanner::SqlStatement select(
      "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
  using RowType = std::tuple<std::int64_t, std::int64_t, std::string>;

  // Read#1.
  auto rows1 = client.ExecuteQuery(read_only, select);
  std::cout << "Read 1 results\n";
  for (auto& row : spanner::StreamOf<RowType>(rows1)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
  // Read#2. Even if changes occur in-between the reads the transaction ensures
  // that Read #1 and Read #2 return the same data.
  auto rows2 = client.ExecuteQuery(read_only, select);
  std::cout << "Read 2 results\n";
  for (auto& row : spanner::StreamOf<RowType>(rows2)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
}

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Transactions;

public class QueryDataWithTransactionAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<List<Album>> QueryDataWithTransactionAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>();
        using TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        using var connection = new SpannerConnection(connectionString);

        // Opens the connection so that the Spanner transaction included in the TransactionScope
        // is read-only TimestampBound.Strong.
        await connection.OpenAsync(SpannerTransactionCreationOptions.ReadOnly, options: null, cancellationToken: default);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");

        // Read #1.
        using (var reader = await cmd.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                Console.WriteLine("SingerId : " + reader.GetFieldValue<string>("SingerId")
                    + " AlbumId : " + reader.GetFieldValue<string>("AlbumId")
                    + " AlbumTitle : " + reader.GetFieldValue<string>("AlbumTitle"));
            }
        }

        // Read #2. Even if changes occur in-between the reads,
        // the transaction ensures that Read #1 and Read #2
        // return the same data.
        using (var reader = await cmd.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                albums.Add(new Album
                {
                    AlbumId = reader.GetFieldValue<int>("AlbumId"),
                    SingerId = reader.GetFieldValue<int>("SingerId"),
                    AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
                });
            }
        }
        scope.Complete();
        Console.WriteLine("Transaction complete.");
        return albums;
    }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readOnlyTransaction(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	ro := client.ReadOnlyTransaction()
	defer ro.Close()
	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := ro.Query(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}

	iter = ro.Read(ctx, "Albums", spanner.AllKeys(), []string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

Java

static void readOnlyTransaction(DatabaseClient dbClient) {
  // ReadOnlyTransaction must be closed by calling close() on it to release resources held by it.
  // We use a try-with-resource block to automatically do so.
  try (ReadOnlyTransaction transaction = dbClient.readOnlyTransaction()) {
    try (ResultSet queryResultSet =
        transaction.executeQuery(
            Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
      while (queryResultSet.next()) {
        System.out.printf(
            "%d %d %s\n",
            queryResultSet.getLong(0), queryResultSet.getLong(1), queryResultSet.getString(2));
      }
    } // queryResultSet.close() is automatically called here
    try (ResultSet readResultSet =
        transaction.read(
          "Albums", KeySet.all(), Arrays.asList("SingerId", "AlbumId", "AlbumTitle"))) {
      while (readResultSet.next()) {
        System.out.printf(
            "%d %d %s\n",
            readResultSet.getLong(0), readResultSet.getLong(1), readResultSet.getString(2));
      }
    } // readResultSet.close() is automatically called here
  } // transaction.close() is automatically called here
}

Node.js

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Gets a transaction object that captures the database state
// at a specific point in time
database.getSnapshot(async (err, transaction) => {
  if (err) {
    console.error(err);
    return;
  }
  const queryOne = 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums';

  try {
    // Read #1, using SQL
    const [qOneRows] = await transaction.run(queryOne);

    qOneRows.forEach(row => {
      const json = row.toJSON();
      console.log(
        `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`,
      );
    });

    const queryTwo = {
      columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
    };

    // Read #2, using the `read` method. Even if changes occur
    // in-between the reads, the transaction ensures that both
    // return the same data.
    const [qTwoRows] = await transaction.read('Albums', queryTwo);

    qTwoRows.forEach(row => {
      const json = row.toJSON();
      console.log(
        `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`,
      );
    });

    console.log('Successfully executed read-only transaction.');
  } catch (err) {
    console.error('ERROR:', err);
  } finally {
    transaction.end();
    // Close the database when finished.
    await database.close();
  }
});

PHP

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads data inside of a read-only transaction.
 *
 * Within the read-only transaction, or "snapshot", the application sees
 * consistent view of the database at a particular timestamp.
 * Example:
 * ```
 * read_only_transaction($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_only_transaction(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $snapshot = $database->snapshot();
    $results = $snapshot->execute(
        'SELECT SingerId, AlbumId, AlbumTitle FROM Albums'
    );
    print('Results from the first read:' . PHP_EOL);
    foreach ($results as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }

    // Perform another read using the `read` method. Even if the data
    // is updated in-between the reads, the snapshot ensures that both
    // return the same data.
    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle']
    );

    print('Results from the second read:' . PHP_EOL);
    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

def read_only_transaction(instance_id, database_id):
    """Reads data inside of a read-only transaction.

    Within the read-only transaction, or "snapshot", the application sees
    consistent view of the database at a particular timestamp.
    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot(multi_use=True) as snapshot:
        # Read using SQL.
        results = snapshot.execute_sql(
            "SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
        )

        print("Results from first read:")
        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

        # Perform another read using the `read` method. Even if the data
        # is updated in-between the reads, the snapshot ensures that both
        # return the same data.
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums", columns=("SingerId", "AlbumId", "AlbumTitle"), keyset=keyset
        )

        print("Results from second read:")
        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.snapshot do |snapshot|
  snapshot.execute("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").rows.each do |row|
    puts "#{row[:AlbumId]} #{row[:AlbumTitle]} #{row[:SingerId]}"
  end

  # Even if changes occur in-between the reads, the transaction ensures that
  # both return the same data.
  snapshot.read("Albums", [:AlbumId, :AlbumTitle, :SingerId]).rows.each do |row|
    puts "#{row[:AlbumId]} #{row[:AlbumTitle]} #{row[:SingerId]}"
  end
end

セマンティクス

このセクションでは、読み取り専用トランザクションのセマンティクスについて説明します。

スナップショット読み取り専用トランザクション

Spanner で読み取り専用トランザクションが実行されると、すべての読み取りが単一の論理ポイントで実行されます。つまり、読み取り専用トランザクションと他の同時実行読み取りおよび書き込みの両方が、その特定の時点でのデータベースの一貫性のあるスナップショットを確認します。

これらのスナップショット読み取り専用トランザクションは、ロック型読み取り / 書き込みトランザクションと比較して、整合性のある読み取りを実現するよりシンプルなアプローチを提供します。その理由は次のとおりです。

  • ロックなし: 読み取り専用トランザクションはロックを取得しません。その代わりに、Spanner タイムスタンプを選択して、そのデータの過去のバージョンに対してすべての読み取りを実行します。ロックを使用しないため、同時読み書きトランザクションをブロックしません。
  • 中止なし: これらのトランザクションはアボートしません。選択した読み取りタイムスタンプがガベージ コレクションされると失敗する可能性がありますが、Spanner のデフォルトのガベージ コレクション ポリシーは通常、ほとんどのアプリケーションでこの問題が発生しないほど十分な大きさです。
  • commit やロールバックなし: 読み取り専用トランザクションでは、sessions.commitsessions.rollback の呼び出しは必要ありません。実際には、呼び出しは禁止されています。

スナップショット トランザクションを実行するために、クライアントはタイムスタンプの境界を定義します。これにより、Spanner に読み取りタイムスタンプの選択方法が指示されます。タイムスタンプの範囲のタイプは次のとおりです。

  • 強力な読み取り: これらの読み取りでは、読み取りが開始される前に commit されたすべてのトランザクションの結果を確実に確認できます。1 回の読み取り内のすべての行は一貫性があります。ただし、強力な読み取りは繰り返せません。強力な読み取りはタイムスタンプを返しますが、同じタイムスタンプで再度読み取ることは繰り返すことができます。同時に書き込みが行われた場合、2 つの連続する強力な読み取り専用トランザクションで異なる結果が生成されることがあります。変更ストリームのクエリでは、この境界を使用する必要があります。詳細については、TransactionOptions.ReadOnly.strong をご覧ください。
  • 正確なステイルネス: このオプションは、絶対タイムスタンプまたは現在の時間に関連するステイルネス期間として指定したタイムスタンプで読み取りを実行します。これにより、そのタイムスタンプまでのグローバル トランザクション履歴の一貫したプレフィックスが確実に観測され、読み取りタイムスタンプ以下のタイムスタンプで commit される可能性のある競合するトランザクションがブロックされます。境界付きの古いデータモードよりもわずかに高速ですが、古いデータが返される可能性があります。詳細については、TransactionOptions.ReadOnly.read_timestampTransactionOptions.ReadOnly.exact_staleness をご覧ください。
  • 境界付きステイルネス: Spanner は、ユーザー定義のステイルネス制限内で最も新しいタイムスタンプを選択し、ブロックすることなく最も近くにある利用可能なレプリカで実行できるようにします。返されるすべての行は一貫性があります。強い読み取りと同様に、バウンド境界付きステイルネスは繰り返せません。同じ境界でも、異なる読み取りが異なるタイムスタンプで実行される可能性があるためです。これらの読み取りは 2 つのフェーズ(タイムスタンプ ネゴシエーションと読み取り)で動作し、通常は正確なステイルネスよりも少し遅くなりますが、より新しい結果が返されることが多く、ローカル レプリカで実行される可能性が高くなります。このモードは、1 回限りの読み取り専用トランザクションでのみ使用できます。これは、タイムスタンプ ネゴシエーションで、どの行が事前に読み取られるかを知る必要があるためです。詳細については、TransactionOptions.ReadOnly.max_stalenessTransactionOptions.ReadOnly.min_read_timestamp をご覧ください。

パーティション化 DML トランザクション

パーティション化 DML を使用すると、トランザクション数の上限到達や、テーブル全体がロックされることなく、大規模な UPDATE ステートメントや DELETE ステートメントを実行できます。Spanner は、キースペースをパーティショニングし、別々の読み取り / 書き込みトランザクション内で各パーティションの DML ステートメントを実行することで、これを実現します。

パーティション化されていない DML を使用するには、コード内で明示的に作成した読み取り / 書き込みトランザクション内でステートメントを実行します。詳細については、DML の使用をご覧ください。

インターフェース

Spanner には、単一のパーティション化 DML ステートメントを実行するための TransactionOptions.partitionedDml インターフェースが用意されています。

次のコード例では、Albums テーブルの MarketingBudget 列を更新します。

C++

ExecutePartitionedDml() 関数を使用して、パーティション化 DML ステートメントを実行します。

void DmlPartitionedUpdate(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto result = client.ExecutePartitionedDml(
      spanner::SqlStatement("UPDATE Albums SET MarketingBudget = 100000"
                            "  WHERE SingerId > 1"));
  if (!result) throw std::move(result).status();
  std::cout << "Updated at least " << result->row_count_lower_bound
            << " row(s) [spanner_dml_partitioned_update]\n";
}

C#

ExecutePartitionedUpdateAsync() メソッドを使用して、パーティション化 DML ステートメントを実行します。


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;

public class UpdateUsingPartitionedDmlCoreAsyncSample
{
    public async Task<long> UpdateUsingPartitionedDmlCoreAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var cmd = connection.CreateDmlCommand("UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1");
        long rowCount = await cmd.ExecutePartitionedUpdateAsync();

        Console.WriteLine($"{rowCount} row(s) updated...");
        return rowCount;
    }
}

Go

PartitionedUpdate() メソッドを使用して、パーティション化 DML ステートメントを実行します。


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func updateUsingPartitionedDML(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1"}
	rowCount, err := client.PartitionedUpdate(ctx, stmt)
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "%d record(s) updated.\n", rowCount)
	return nil
}

Java

executePartitionedUpdate() メソッドを使用して、パーティション化 DML ステートメントを実行します。

static void updateUsingPartitionedDml(DatabaseClient dbClient) {
  String sql = "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1";
  long rowCount = dbClient.executePartitionedUpdate(Statement.of(sql));
  System.out.printf("%d records updated.\n", rowCount);
}

Node.js

runPartitionedUpdate() メソッドを使用して、パーティション化 DML ステートメントを実行します。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

try {
  const [rowCount] = await database.runPartitionedUpdate({
    sql: 'UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1',
  });
  console.log(`Successfully updated ${rowCount} records.`);
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  database.close();
}

PHP

executePartitionedUpdate() メソッドを使用して、パーティション化 DML ステートメントを実行します。

use Google\Cloud\Spanner\SpannerClient;

/**
 * Updates sample data in the database by partition with a DML statement.
 *
 * This updates the `MarketingBudget` column which must be created before
 * running this sample. You can add the column by running the `add_column`
 * sample or by running this DDL statement against your database:
 *
 *     ALTER TABLE Albums ADD COLUMN MarketingBudget INT64
 *
 * Example:
 * ```
 * update_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function update_data_with_partitioned_dml(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $rowCount = $database->executePartitionedUpdate(
        'UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1'
    );

    printf('Updated %d row(s).' . PHP_EOL, $rowCount);
}

Python

execute_partitioned_dml() メソッドを使用して、パーティション化 DML ステートメントを実行します。

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"

spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

row_ct = database.execute_partitioned_dml(
    "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1"
)

print("{} records updated.".format(row_ct))

Ruby

execute_partitioned_update() メソッドを使用して、パーティション化 DML ステートメントを実行します。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

row_count = client.execute_partition_update(
  "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1"
)

puts "#{row_count} records updated."

次のコード例では、SingerId 列に基づいて Singers テーブルから行を削除します。

C++

void DmlPartitionedDelete(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto result = client.ExecutePartitionedDml(
      spanner::SqlStatement("DELETE FROM Singers WHERE SingerId > 10"));
  if (!result) throw std::move(result).status();
  std::cout << "Deleted at least " << result->row_count_lower_bound
            << " row(s) [spanner_dml_partitioned_delete]\n";
}

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;

public class DeleteUsingPartitionedDmlCoreAsyncSample
{
    public async Task<long> DeleteUsingPartitionedDmlCoreAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var cmd = connection.CreateDmlCommand("DELETE FROM Singers WHERE SingerId > 10");
        long rowCount = await cmd.ExecutePartitionedUpdateAsync();

        Console.WriteLine($"{rowCount} row(s) deleted...");
        return rowCount;
    }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func deleteUsingPartitionedDML(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: "DELETE FROM Singers WHERE SingerId > 10"}
	rowCount, err := client.PartitionedUpdate(ctx, stmt)
	if err != nil {
		return err

	}
	fmt.Fprintf(w, "%d record(s) deleted.", rowCount)
	return nil
}

Java

static void deleteUsingPartitionedDml(DatabaseClient dbClient) {
  String sql = "DELETE FROM Singers WHERE SingerId > 10";
  long rowCount = dbClient.executePartitionedUpdate(Statement.of(sql));
  System.out.printf("%d records deleted.\n", rowCount);
}

Node.js

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

try {
  const [rowCount] = await database.runPartitionedUpdate({
    sql: 'DELETE FROM Singers WHERE SingerId > 10',
  });
  console.log(`Successfully deleted ${rowCount} records.`);
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  database.close();
}

PHP

use Google\Cloud\Spanner\SpannerClient;

/**
 * Delete sample data in the database by partition with a DML statement.
 *
 * This updates the `MarketingBudget` column which must be created before
 * running this sample. You can add the column by running the `add_column`
 * sample or by running this DDL statement against your database:
 *
 *     ALTER TABLE Albums ADD COLUMN MarketingBudget INT64
 *
 * Example:
 * ```
 * update_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function delete_data_with_partitioned_dml(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $rowCount = $database->executePartitionedUpdate(
        'DELETE FROM Singers WHERE SingerId > 10'
    );

    printf('Deleted %d row(s).' . PHP_EOL, $rowCount);
}

Python

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

row_ct = database.execute_partitioned_dml("DELETE FROM Singers WHERE SingerId > 10")

print("{} record(s) deleted.".format(row_ct))

Ruby

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

row_count = client.execute_partition_update(
  "DELETE FROM Singers WHERE SingerId > 10"
)

puts "#{row_count} records deleted."

セマンティクス

このセクションでは、パーティション化 DML のセマンティクスについて説明します。

パーティション化 DML の実行について

クライアント ライブラリのメソッドと Google Cloud CLI のいずれを使用する場合でも、パーティション化 DML ステートメントは一度に 1 つしか実行できません。

パーティション化されたトランザクションは、commit またはロールバックをサポートしていません。Spanner は、DML ステートメントをすぐに実行して適用します。オペレーションをキャンセルした場合やオペレーションが失敗した場合、Spanner は実行中のすべてのパーティションをキャンセルし、残りのパーティションを開始しません。ただし、Spanner は、すでに実行されているパーティションをロールバックしません。

パーティション化 DML のロック取得戦略

ロック競合を減らすため、パーティション化 DML は WHERE 句に一致する行に対してのみ読み取りロックを取得します。各パーティションで使用される小規模で独立したトランザクションも、ロックの保持時間が短くなります。

セッションのトランザクション数の上限

Spanner の各セッションには、一度に 1 つのアクティブなトランザクションを設定できます。これには、内部でトランザクションを使用してこの上限にカウントされるスタンドアロンの読み取りとクエリが含まれます。トランザクションが完了すると、セッションは次のトランザクションですぐに再利用できます。トランザクションごとに新しいセッションを作成する必要はありません。

古い読み取りタイムスタンプとバージョン ガベージ コレクション

Spanner は、削除または上書きされたデータを収集してストレージを再利用するために、バージョン ガベージ コレクションを実行します。デフォルトでは、1 時間より古いデータは再利用されます。Spanner は、構成された VERSION_RETENTION_PERIOD より古いタイムスタンプで読み取りを実行できません。デフォルトは 1 時間ですが、最大 1 週間まで構成できます。実行中に読み取りが古すぎると、読み取りは失敗し、FAILED_PRECONDITION エラーが返されます。

変更ストリームのクエリ

変更ストリームは、データベース全体、特定のテーブル、またはデータベース内の定義された一連の列のデータ変更をモニタリングするように構成できるスキーマ オブジェクトです。

変更ストリームを作成すると、Spanner は対応する SQL テーブル値関数(TVF)を定義します。この TVF を使用すると、sessions.executeStreamingSql メソッドを使用して、関連する変更ストリームの変更レコードをクエリできます。TVF の名前は変更ストリームの名前から生成され、常に READ_ で始まります。

変更ストリーム TVF に対するすべてのクエリは、強力な読み取り専用 timestamp_bound を使用した 1 回使用の読み取り専用トランザクション内の sessions.executeStreamingSql API を使用して実行する必要があります。変更ストリーム TVF を使用すると、期間の start_timestampend_timestamp を指定できます。保持期間内のすべての変更レコードには、この強力な読み取り専用 timestamp_bound を使用してアクセスできます。他のすべての TransactionOptions は、変更ストリーム クエリでは無効です。

また、TransactionOptions.read_only.return_read_timestamptrue に設定されている場合、トランザクションを記述する Transaction メッセージは、有効な読み取りタイムスタンプではなく、特別な値 2^63 - 2 を返します。この特殊な値を破棄し、後続のクエリには使用しないでください。

詳細については、変更ストリームのクエリ ワークフローをご覧ください。

アイドル状態のトランザクション

未処理の読み取りまたは SQL クエリがなく、過去 10 秒間に開始されていないトランザクションは、アイドル状態と見なされます。Spanner では、アイドル状態のトランザクションを中止して、ロックが無期限に保持されることを回避できます。アイドル状態のトランザクションが中止されると、commit は失敗し、ABORTED エラーが返されます。トランザクション内で SELECT 1 などの小さなクエリを定期的に実行すると、トランザクションがアイドル状態になるのを防ぐことができます。