交易總覽

本頁說明 Spanner 中的交易,並介紹 Spanner 的讀寫、唯讀和分區 DML 交易介面。

Spanner 中的交易是一組以不可分割的形式,在單一邏輯時點,跨資料庫中的資料欄、資料列與資料表執行的讀取與寫入作業。

工作階段可用於在 Spanner 資料庫中執行交易。工作階段代表與 Spanner 資料庫服務的邏輯通訊管道。工作階段一次可執行單一或多筆交易。詳情請參閱「工作階段」。

交易類型

Spanner 支援下列交易類型,每種交易類型都專為特定資料互動模式設計:

  • 讀寫:這類交易會使用消極鎖定,並視需要進行兩階段修訂。這些要求可能會失敗,因此需要重試。雖然只能存取單一資料庫,但可以修改該資料庫中多個資料表的資料。

  • 唯讀:這類交易可確保多次讀取作業都會產生一致的結果,但不允許修改資料。為確保一致性,系統會決定執行時間戳記,或在使用者設定的過去時間戳記執行。與讀寫交易不同,唯讀交易不需要修訂作業或鎖定,但可能會暫停,等待進行中的寫入作業完成。

  • 分區 DML:這個交易類型會以分區 DML 作業執行 DML 陳述式。這項功能經過最佳化,可進行大規模的資料更新和刪除作業,例如資料清理或大量插入資料。如要執行大量寫入作業,但不需要原子交易,建議使用批次寫入作業。詳情請參閱「使用批次寫入修改資料」。

讀寫交易

使用鎖定讀寫交易,以不可分割的形式讀取、修改及寫入資料庫中任何位置的資料。這類交易在外部是一致的。

盡量縮短交易的有效時間。交易時間越短,成功提交的機率就越高,爭用情況也會減少。只要交易持續執行讀取作業,且交易尚未透過 sessions.commitsessions.rollback 作業終止,Spanner 就會嘗試保持讀取鎖定狀態。如果用戶端長時間處於閒置狀態,Spanner 可能會釋出交易的鎖定,並中止交易。

從概念上來說,讀寫交易包含零個或多個讀取作業或 SQL 陳述式,後面接著 sessions.commit。在 sessions.commit 之前,用戶端隨時可以傳送 sessions.rollback 要求來中止交易。

如要執行依據一或多項讀取作業的寫入作業,請使用鎖定讀寫交易:

  • 如要以不可分割的形式修訂一或多項寫入作業,請在相同的讀寫交易中執行這些寫入作業。舉例來說,如果您要從帳戶 A 轉 $200 美元到帳戶 B,請在同一筆交易中執行兩項寫入作業 (帳戶 A 減少 $200 美元,帳戶 B 增加 $200 美元),並讀取初始帳戶餘額。
  • 如要將帳戶 A 的餘額加倍,請在同一個交易中執行讀取和寫入作業。這可確保系統先讀取餘額,再將餘額加倍,然後更新餘額。
  • 如果您可能會根據一或多項讀取作業的結果,執行一或多項寫入作業,請在同一個讀寫交易中執行這些寫入和讀取作業,即使寫入作業未執行也一樣。舉例來說,如果只有在帳戶 A 目前的餘額大於 $500 美元時,才要從帳戶 A 轉 $200 美元到帳戶 B,請在同一個交易中加入帳戶 A 餘額的讀取作業和條件式寫入作業,即使轉帳作業未發生也一樣。

如要執行讀取作業,請使用單一讀取方法或唯讀交易:

  • 如果您只執行讀取作業,而且可以使用單一讀取方法陳述讀取作業,請使用該單一讀取方法或唯讀交易。與讀寫交易不同,單一讀取作業不會取得鎖定。

介面

Spanner 用戶端程式庫提供介面,可在讀寫交易中執行作業主體,並在交易取消時重試。Spanner 交易可能需要多次重試才能提交。

交易中止的原因有很多,舉例來說,如果兩個交易嘗試同時修改資料,可能會發生鎖死。在這種情況下,Spanner 會取消其中一個交易,讓另一個交易繼續進行。在極少數的情況下,Spanner 中的暫時事件也可能導致交易取消。

由於交易是不可分割的,取消交易不會影響資料庫。在同一工作階段中重試交易,提高成功率。每次重試導致 ABORTED 錯誤時,交易的鎖定優先順序就會提高。

在 Spanner 用戶端程式庫中使用交易時,您會以函式物件的形式定義交易主體。這個函式會封裝對一或多個資料庫表格執行的讀取和寫入作業。Spanner 用戶端程式庫會反覆執行這項函式,直到交易順利修訂,或發生無法重試的錯誤為止。

範例

假設您在Albums 資料表中含有 MarketingBudget 欄:

CREATE TABLE Albums (
  SingerId        INT64 NOT NULL,
  AlbumId         INT64 NOT NULL,
  AlbumTitle      STRING(MAX),
  MarketingBudget INT64
) PRIMARY KEY (SingerId, AlbumId);

行銷部門要求您從 Albums (2, 2) 的預算中撥出 $200,000 美元到 Albums (1, 1),但前提是該專輯的預算有足夠的金額。針對此項作業,您應使用鎖定讀寫交易,因為交易可能會根據讀取的結果來進行寫入作業。

以下顯示如何執行讀寫交易:

C++

void ReadWriteTransaction(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  using ::google::cloud::StatusOr;

  // A helper to read a single album MarketingBudget.
  auto get_current_budget =
      [](spanner::Client client, spanner::Transaction txn,
         std::int64_t singer_id,
         std::int64_t album_id) -> StatusOr<std::int64_t> {
    auto key = spanner::KeySet().AddKey(spanner::MakeKey(singer_id, album_id));
    auto rows = client.Read(std::move(txn), "Albums", std::move(key),
                            {"MarketingBudget"});
    using RowType = std::tuple<std::int64_t>;
    auto row = spanner::GetSingularRow(spanner::StreamOf<RowType>(rows));
    if (!row) return std::move(row).status();
    return std::get<0>(*std::move(row));
  };

  auto commit = client.Commit(
      [&client, &get_current_budget](
          spanner::Transaction const& txn) -> StatusOr<spanner::Mutations> {
        auto b1 = get_current_budget(client, txn, 1, 1);
        if (!b1) return std::move(b1).status();
        auto b2 = get_current_budget(client, txn, 2, 2);
        if (!b2) return std::move(b2).status();
        std::int64_t transfer_amount = 200000;

        return spanner::Mutations{
            spanner::UpdateMutationBuilder(
                "Albums", {"SingerId", "AlbumId", "MarketingBudget"})
                .EmplaceRow(1, 1, *b1 + transfer_amount)
                .EmplaceRow(2, 2, *b2 - transfer_amount)
                .Build()};
      });

  if (!commit) throw std::move(commit).status();
  std::cout << "Transfer was successful [spanner_read_write_transaction]\n";
}

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;
using System.Transactions;

public class ReadWriteWithTransactionAsyncSample
{
    public async Task<int> ReadWriteWithTransactionAsync(string projectId, string instanceId, string databaseId)
    {
        // This sample transfers 200,000 from the MarketingBudget
        // field of the second Album to the first Album. Make sure to run
        // the Add Column and Write Data To New Column samples first,
        // in that order.

        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        decimal transferAmount = 200000;
        decimal secondBudget = 0;
        decimal firstBudget = 0;

        using var connection = new SpannerConnection(connectionString);
        using var cmdLookup1 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 2 AND AlbumId = 2");

        using (var reader = await cmdLookup1.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                // Read the second album's budget.
                secondBudget = reader.GetFieldValue<decimal>("MarketingBudget");
                // Confirm second Album's budget is sufficient and
                // if not raise an exception. Raising an exception
                // will automatically roll back the transaction.
                if (secondBudget < transferAmount)
                {
                    throw new Exception($"The second album's budget {secondBudget} is less than the amount to transfer.");
                }
            }
        }

        // Read the first album's budget.
        using var cmdLookup2 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 1 and AlbumId = 1");
        using (var reader = await cmdLookup2.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                firstBudget = reader.GetFieldValue<decimal>("MarketingBudget");
            }
        }

        // Specify update command parameters.
        using var cmdUpdate = connection.CreateUpdateCommand("Albums", new SpannerParameterCollection
        {
            { "SingerId", SpannerDbType.Int64 },
            { "AlbumId", SpannerDbType.Int64 },
            { "MarketingBudget", SpannerDbType.Int64 },
        });

        // Update second album to remove the transfer amount.
        secondBudget -= transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 2;
        cmdUpdate.Parameters["AlbumId"].Value = 2;
        cmdUpdate.Parameters["MarketingBudget"].Value = secondBudget;
        var rowCount = await cmdUpdate.ExecuteNonQueryAsync();

        // Update first album to add the transfer amount.
        firstBudget += transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 1;
        cmdUpdate.Parameters["AlbumId"].Value = 1;
        cmdUpdate.Parameters["MarketingBudget"].Value = firstBudget;
        rowCount += await cmdUpdate.ExecuteNonQueryAsync();
        scope.Complete();
        Console.WriteLine("Transaction complete.");
        return rowCount;
    }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func writeWithTransaction(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		getBudget := func(key spanner.Key) (int64, error) {
			row, err := txn.ReadRow(ctx, "Albums", key, []string{"MarketingBudget"})
			if err != nil {
				return 0, err
			}
			var budget int64
			if err := row.Column(0, &budget); err != nil {
				return 0, err
			}
			return budget, nil
		}
		album2Budget, err := getBudget(spanner.Key{2, 2})
		if err != nil {
			return err
		}
		const transferAmt = 200000
		if album2Budget >= transferAmt {
			album1Budget, err := getBudget(spanner.Key{1, 1})
			if err != nil {
				return err
			}
			album1Budget += transferAmt
			album2Budget -= transferAmt
			cols := []string{"SingerId", "AlbumId", "MarketingBudget"}
			txn.BufferWrite([]*spanner.Mutation{
				spanner.Update("Albums", cols, []interface{}{1, 1, album1Budget}),
				spanner.Update("Albums", cols, []interface{}{2, 2, album2Budget}),
			})
			fmt.Fprintf(w, "Moved %d from Album2's MarketingBudget to Album1's.", transferAmt)
		}
		return nil
	})
	return err
}

Java

static void writeWithTransaction(DatabaseClient dbClient) {
  dbClient
      .readWriteTransaction()
      .run(transaction -> {
        // Transfer marketing budget from one album to another. We do it in a transaction to
        // ensure that the transfer is atomic.
        Struct row =
            transaction.readRow("Albums", Key.of(2, 2), Arrays.asList("MarketingBudget"));
        long album2Budget = row.getLong(0);
        // Transaction will only be committed if this condition still holds at the time of
        // commit. Otherwise it will be aborted and the callable will be rerun by the
        // client library.
        long transfer = 200000;
        if (album2Budget >= transfer) {
          long album1Budget =
              transaction
                  .readRow("Albums", Key.of(1, 1), Arrays.asList("MarketingBudget"))
                  .getLong(0);
          album1Budget += transfer;
          album2Budget -= transfer;
          transaction.buffer(
              Mutation.newUpdateBuilder("Albums")
                  .set("SingerId")
                  .to(1)
                  .set("AlbumId")
                  .to(1)
                  .set("MarketingBudget")
                  .to(album1Budget)
                  .build());
          transaction.buffer(
              Mutation.newUpdateBuilder("Albums")
                  .set("SingerId")
                  .to(2)
                  .set("AlbumId")
                  .to(2)
                  .set("MarketingBudget")
                  .to(album2Budget)
                  .build());
        }
        return null;
      });
}

Node.js

// This sample transfers 200,000 from the MarketingBudget field
// of the second Album to the first Album, as long as the second
// Album has enough money in its budget. Make sure to run the
// addColumn and updateData samples first (in that order).

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const transferAmount = 200000;

database.runTransaction(async (err, transaction) => {
  if (err) {
    console.error(err);
    return;
  }
  let firstBudget, secondBudget;
  const queryOne = {
    columns: ['MarketingBudget'],
    keys: [[2, 2]], // SingerId: 2, AlbumId: 2
  };

  const queryTwo = {
    columns: ['MarketingBudget'],
    keys: [[1, 1]], // SingerId: 1, AlbumId: 1
  };

  Promise.all([
    // Reads the second album's budget
    transaction.read('Albums', queryOne).then(results => {
      // Gets second album's budget
      const rows = results[0].map(row => row.toJSON());
      secondBudget = rows[0].MarketingBudget;
      console.log(`The second album's marketing budget: ${secondBudget}`);

      // Makes sure the second album's budget is large enough
      if (secondBudget < transferAmount) {
        throw new Error(
          `The second album's budget (${secondBudget}) is less than the transfer amount (${transferAmount}).`,
        );
      }
    }),

    // Reads the first album's budget
    transaction.read('Albums', queryTwo).then(results => {
      // Gets first album's budget
      const rows = results[0].map(row => row.toJSON());
      firstBudget = rows[0].MarketingBudget;
      console.log(`The first album's marketing budget: ${firstBudget}`);
    }),
  ])
    .then(() => {
      console.log(firstBudget, secondBudget);
      // Transfers the budgets between the albums
      firstBudget += transferAmount;
      secondBudget -= transferAmount;

      console.log(firstBudget, secondBudget);

      // Updates the database
      // Note: Cloud Spanner interprets Node.js numbers as FLOAT64s, so they
      // must be converted (back) to strings before being inserted as INT64s.
      transaction.update('Albums', [
        {
          SingerId: '1',
          AlbumId: '1',
          MarketingBudget: firstBudget.toString(),
        },
        {
          SingerId: '2',
          AlbumId: '2',
          MarketingBudget: secondBudget.toString(),
        },
      ]);
    })
    .then(() => {
      // Commits the transaction and send the changes to the database
      return transaction.commit();
    })
    .then(() => {
      console.log(
        `Successfully executed read-write transaction to transfer ${transferAmount} from Album 2 to Album 1.`,
      );
    })
    .catch(err => {
      console.error('ERROR:', err);
    })
    .then(() => {
      transaction.end();
      // Closes the database when finished
      return database.close();
    });
});

PHP

use Google\Cloud\Spanner\SpannerClient;
use Google\Cloud\Spanner\Transaction;
use UnexpectedValueException;

/**
 * Performs a read-write transaction to update two sample records in the
 * database.
 *
 * This will transfer 200,000 from the `MarketingBudget` field for the second
 * Album to the first Album. If the `MarketingBudget` for the second Album is
 * too low, it will raise an exception.
 *
 * Before running this sample, you will need to run the `update_data` sample
 * to populate the fields.
 * Example:
 * ```
 * read_write_transaction($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_write_transaction(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $database->runTransaction(function (Transaction $t) use ($spanner) {
        $transferAmount = 200000;

        // Read the second album's budget.
        $secondAlbumKey = [2, 2];
        $secondAlbumKeySet = $spanner->keySet(['keys' => [$secondAlbumKey]]);
        $secondAlbumResult = $t->read(
            'Albums',
            $secondAlbumKeySet,
            ['MarketingBudget'],
            ['limit' => 1]
        );

        $firstRow = $secondAlbumResult->rows()->current();
        $secondAlbumBudget = $firstRow['MarketingBudget'];
        if ($secondAlbumBudget < $transferAmount) {
            // Throwing an exception will automatically roll back the transaction.
            throw new UnexpectedValueException(
                'The second album\'s budget is lower than the transfer amount: ' . $transferAmount
            );
        }

        $firstAlbumKey = [1, 1];
        $firstAlbumKeySet = $spanner->keySet(['keys' => [$firstAlbumKey]]);
        $firstAlbumResult = $t->read(
            'Albums',
            $firstAlbumKeySet,
            ['MarketingBudget'],
            ['limit' => 1]
        );

        // Read the first album's budget.
        $firstRow = $firstAlbumResult->rows()->current();
        $firstAlbumBudget = $firstRow['MarketingBudget'];

        // Update the budgets.
        $secondAlbumBudget -= $transferAmount;
        $firstAlbumBudget += $transferAmount;
        printf('Setting first album\'s budget to %s and the second album\'s ' .
            'budget to %s.' . PHP_EOL, $firstAlbumBudget, $secondAlbumBudget);

        // Update the rows.
        $t->updateBatch('Albums', [
            ['SingerId' => 1, 'AlbumId' => 1, 'MarketingBudget' => $firstAlbumBudget],
            ['SingerId' => 2, 'AlbumId' => 2, 'MarketingBudget' => $secondAlbumBudget],
        ]);

        // Commit the transaction!
        $t->commit();

        print('Transaction complete.' . PHP_EOL);
    });
}

Python

def read_write_transaction(instance_id, database_id):
    """Performs a read-write transaction to update two sample records in the
    database.

    This will transfer 200,000 from the `MarketingBudget` field for the second
    Album to the first Album. If the `MarketingBudget` is too low, it will
    raise an exception.

    Before running this sample, you will need to run the `update_data` sample
    to populate the fields.
    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    def update_albums(transaction):
        # Read the second album budget.
        second_album_keyset = spanner.KeySet(keys=[(2, 2)])
        second_album_result = transaction.read(
            table="Albums",
            columns=("MarketingBudget",),
            keyset=second_album_keyset,
            limit=1,
        )
        second_album_row = list(second_album_result)[0]
        second_album_budget = second_album_row[0]

        transfer_amount = 200000

        if second_album_budget < transfer_amount:
            # Raising an exception will automatically roll back the
            # transaction.
            raise ValueError("The second album doesn't have enough funds to transfer")

        # Read the first album's budget.
        first_album_keyset = spanner.KeySet(keys=[(1, 1)])
        first_album_result = transaction.read(
            table="Albums",
            columns=("MarketingBudget",),
            keyset=first_album_keyset,
            limit=1,
        )
        first_album_row = list(first_album_result)[0]
        first_album_budget = first_album_row[0]

        # Update the budgets.
        second_album_budget -= transfer_amount
        first_album_budget += transfer_amount
        print(
            "Setting first album's budget to {} and the second album's "
            "budget to {}.".format(first_album_budget, second_album_budget)
        )

        # Update the rows.
        transaction.update(
            table="Albums",
            columns=("SingerId", "AlbumId", "MarketingBudget"),
            values=[(1, 1, first_album_budget), (2, 2, second_album_budget)],
        )

    database.run_in_transaction(update_albums)

    print("Transaction complete.")

Ruby

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner         = Google::Cloud::Spanner.new project: project_id
client          = spanner.client instance_id, database_id
transfer_amount = 200_000

client.transaction do |transaction|
  first_album  = transaction.read("Albums", [:MarketingBudget], keys: [[1, 1]]).rows.first
  second_album = transaction.read("Albums", [:MarketingBudget], keys: [[2, 2]]).rows.first

  raise "The second album does not have enough funds to transfer" if second_album[:MarketingBudget] < transfer_amount

  new_first_album_budget  = first_album[:MarketingBudget] + transfer_amount
  new_second_album_budget = second_album[:MarketingBudget] - transfer_amount

  transaction.update "Albums", [
    { SingerId: 1, AlbumId: 1, MarketingBudget: new_first_album_budget  },
    { SingerId: 2, AlbumId: 2, MarketingBudget: new_second_album_budget }
  ]
end

puts "Transaction complete"

語意

本節說明 Spanner 中讀寫交易的語意。

屬性

Spanner 中的讀寫交易會以不可分割的形式執行一組讀寫作業。讀寫交易執行的時間戳記會與經過時間吻合。序列化順序會符合這個時間戳記順序。

讀寫交易會提供關聯資料庫的 ACID 屬性。Spanner 讀寫交易提供的屬性比一般 ACID 更強大。

由於有這些屬性,做為應用程式開發商,您可以著重在每個交易自身的正確性,不須擔心如何保護自己的執行作業,以免遭到同時間執行的其他交易覆寫。

讀寫交易的隔離層級

成功提交包含一系列讀取和寫入作業的交易後,您會看到下列情況:

  • 交易會傳回的值,反映交易提交時間戳記的穩定快照。
  • 在提交時,空白資料列或範圍仍會保持空白。
  • 交易會在交易的修訂時間戳記修訂所有寫入作業。
  • 交易修訂後,交易才能看到寫入作業。

Spanner 用戶端驅動程式包含交易重試邏輯,可重新執行交易並驗證用戶端觀察到的資料,藉此遮蓋暫時性錯誤。

效果就是從交易本身或從 Spanner 資料庫的其他讀取者和寫入者看來,所有的讀取和寫入看似在單一時間點發生。也就是說,讀取和寫入作業會在相同的時間戳記發生。如需範例,請參閱「可序列化與外部一致性」。

讀取交易的隔離

如果讀寫交易只執行讀取作業,提供的資料一致性保證與唯讀交易類似。交易中的所有讀取作業都會傳回一致的時間戳記,包括確認不存在的資料列。

其中一個差異是,讀寫交易在未執行寫入作業的情況下修訂。在這種情況下,無法保證在讀取作業和交易修訂之間,資料庫中讀取的資料不會變更。

為確保資料為最新狀態,並驗證資料自上次擷取後未經修改,您必須再次讀取資料。您可以在另一個讀寫交易中或透過強式讀取執行這項重新讀取作業。

如要達到最佳效率,如果交易只執行讀取作業,請使用唯讀交易,而非讀寫交易。

不可分割性、一致性、永久性

除了隔離性之外,Spanner 還提供其他 ACID 屬性保證:

  • 原子性。如果交易中的所有作業都順利完成,或完全沒有作業,則該交易視為不可分割。如果交易中的任何作業失敗,系統會將整筆交易回復至原始狀態,確保資料完整性。
  • 持續上傳。交易必須維護資料庫規則和限制的完整性。交易完成後,資料庫應處於有效狀態,並遵守預先定義的規則。
  • 耐用性。交易修訂後,變更會永久儲存在資料庫中,即使發生系統故障、停電或其他中斷情況,變更也不會消失。

可序列化與外部一致性

Spanner 提供強大的交易保證,包括可序列化外部一致性。這些屬性可確保資料保持一致,且作業會以可預測的順序執行,即使在分散式環境中也是如此。

可序列化可確保所有交易看起來會依序執行,即使是並行處理也一樣。Spanner 會為交易指派修訂時間戳記,反映出交易的修訂順序,藉此達成這項目標。

Spanner 提供更強大的保證,稱為外部一致性。也就是說,交易不僅會依修訂時間戳記反映的順序修訂,這些時間戳記也會與實際時間一致。您可以比較提交時間戳記與實際時間,以全域一致的順序查看資料。

基本上,如果交易 Txn1 在另一個交易 Txn2 之前即時提交,則 Txn1 的提交時間戳記會早於 Txn2 的提交時間戳記。

請見如下範例:

顯示執行兩次皆讀取相同資料的交易的時間軸

在此情境中,時間軸 t 期間會發生下列情況:

  • 交易 Txn1 會讀取資料 A,暫存寫入 A 的作業,然後成功修訂。
  • 交易會在 Txn1 啟動後Txn2開始。它會先讀取資料 B,然後讀取資料 A

即使 Txn2 在 Txn1 完成前啟動,Txn2 仍會觀察 Txn1A 所做的變更。這是因為 Txn1 將寫入作業提交至 A 後,Txn2 會讀取 A

Txn1Txn2 的執行時間可能會重疊,但其提交時間戳記 (分別為 c1c2) 會強制執行線性交易順序。這表示:

  • Txn1 內的所有讀取和寫入作業,看似都在單一時間點 c1 發生。
  • Txn2 內的所有讀取和寫入作業,看似都在單一時間點 c2 發生。
  • 重要的一點是,即使寫入作業是在不同機器上執行,c1 也會早於 c2。如果 Txn2 只執行讀取作業,則 c1 會早於或等於 c2

這種嚴格排序方式表示,如果後續的讀取作業觀察到 Txn2 的效果,也會觀察到 Txn1 的效果。所有成功修訂的交易,這個屬性都會保留 true 值。

交易失敗時的讀取與寫入保證

如果執行交易的呼叫失敗,根據讓修訂呼叫失敗的錯誤,您的讀取與寫入保證會有所不同。

例如,「找不到資料列」或「資料列已存在」錯誤表示撰寫緩衝的變異遭遇某種錯誤,例如用戶端嘗試更新的資料列不存在。在這種情況下,讀取作業保證是一致的,寫入作業尚未套用,不存在的資料列保證會與讀取作業一致。

交易失敗時的讀取與寫入保證

如果 Spanner 交易失敗,您在讀取和寫入時獲得的保證,取決於 commit 作業期間發生的特定錯誤。

舉例來說,「找不到資料列」或「資料列已存在」等錯誤訊息,表示在寫入緩衝變異時發生問題。舉例來說,如果用戶端嘗試更新的資料列不存在,就可能發生這種情況。在這些情況下:

  • 讀取作業一致:保證交易期間讀取的任何資料,在發生錯誤前都具有一致性。
  • 未套用寫入:交易嘗試的變異不會提交至資料庫。
  • 資料列一致性:觸發錯誤的資料列不存在 (或存在) 的狀態,與交易中執行的讀取作業一致。

您可以隨時取消 Spanner 中的非同步讀取作業,而不影響同一交易中的其他進行中作業。如果系統取消高階作業,或您根據初始結果決定中止讀取作業,這種彈性就很有用。

不過,請務必瞭解,要求取消讀取作業不保證會立即終止作業。取消要求後,讀取作業可能仍會:

  • 順利完成:讀取作業可能會在取消生效前完成處理並傳回結果。
  • 因其他原因而失敗:讀取作業可能會因其他錯誤 (例如中止) 而終止。
  • 傳回不完整的結果:讀取作業可能會傳回部分結果,這些結果會在交易提交程序中進行驗證。

此外,也請注意與交易 commit 作業的區別:取消 commit 會中止整個交易,除非交易已修訂或因其他原因而失敗。

成效

本節說明會影響讀寫交易效能的問題。

鎖定並行控制

Spanner 允許多個用戶端同時與相同資料庫互動。為確保這些並行交易的資料一致性,Spanner 採用鎖定機制,同時使用共用和專屬鎖定。

交易執行讀取作業時,Spanner 會取得相關資料的共用讀取鎖定。其他並行讀取作業可透過這些共用鎖定存取相同資料。這項並行作業會持續進行,直到交易準備好要提交變更為止。

在提交階段,當系統套用寫入作業時,交易會嘗試將鎖定升級為專屬鎖定。為達成此目標,這項功能會執行下列操作:

  • 封鎖受影響資料上的任何新共用讀取鎖定要求。
  • 等待該資料上所有現有的共用讀取鎖定釋出。
  • 清除所有共用讀取鎖定後,系統會實行專屬鎖定,在寫入期間授予專屬資料存取權。

鎖定的相關注意事項:

  • 精細度:Spanner 會在資料列和資料欄精細度套用鎖定。也就是說,如果交易 T1 持有資料列 albumid 的資料欄 A 鎖定,交易 T2 仍可同時寫入同一資料列 albumid 的資料欄 B,不會發生衝突。
  • 不讀取就寫入:如果不讀取就寫入,Spanner 不需要專屬鎖定。而是會使用寫入者共用鎖定。這是因為沒有讀取作業的寫入申請順序是由提交時間戳記決定,因此多個寫入者可以同時對同一項目執行作業,不會發生衝突。只有在交易先讀取要寫入的資料時,才需要專屬鎖定。
  • 資料列查詢的次要索引:在讀寫交易中執行資料列查詢時,使用次要索引可大幅提升效能。使用次要索引將掃描的資料列限制在較小的範圍內,Spanner 鎖定的資料列就會較少,因此可對該特定範圍外的資料列進行更多並行修改。
  • 外部資源專屬存取權:Spanner 的內部鎖定機制專為確保 Spanner 資料庫本身的資料一致性而設計。請勿使用這些鎖定,確保專屬存取 Spanner 以外的資源。Spanner 可能會因各種原因中止交易,包括內部系統最佳化 (例如跨運算資源移動資料)。如果系統重試交易 (由應用程式程式碼明確重試,或由 Spanner JDBC 驅動程式等用戶端程式庫隱含重試),只有在成功嘗試提交時,才能保證持有鎖定。
  • 鎖定統計資料:如要診斷及調查資料庫中的鎖定衝突,可以使用鎖定統計資料內省工具。

鎖死偵測

Spanner 會偵測可能發生多筆交易鎖死的情形,並強制取消所有交易,僅保留一筆交易。請考量以下情境: Txn1持有記錄 A 的鎖定,並等待記錄 B 的鎖定,而 Txn2持有記錄 B 的鎖定,並等待記錄 A 的鎖定。如要解決這個問題,其中一項交易必須中止,釋放鎖定,讓另一項交易繼續進行。

Spanner 會使用標準的「受傷等待」演算法偵測死結。在幕後,Spanner 會追蹤要求衝突鎖定的每筆交易的年齡。這項功能可讓較舊的交易中止較新的交易。較舊的交易是指最早的讀取、查詢或修訂發生時間較早的交易。

Spanner 會優先處理時間較長的交易,確保每筆交易最後都會因為時間夠長而有機會優先取得鎖定。舉例來說,如果時間較長的交易需要寫入者共用鎖定,可以取消具有讀取者共用鎖定的時間較短交易。

分散式執行

Spanner 可以在跨多部伺服器的資料上執行交易,但與單一伺服器交易相比,這項功能會產生效能成本。

哪些類型的交易可能會分配到多個帳戶?Spanner 可將資料庫資料列分配給多台伺服器。通常,資料列和對應的交錯式資料表資料列是由相同伺服器提供服務,同個資料表中具有鄰近索引鍵的兩個資料列也是如此。Spanner 可以在不同伺服器上執行跨資料列的交易。不過,一般來說,與影響資料庫或大型資料表中許多資料列的交易相比,影響許多同一位置資料列的交易通常會比較快也比較便宜。

Spanner 中最有效率的交易只包含應以不可分割形式套用的讀寫作業。當所有讀取和寫入作業存取索引鍵空間的同一部分資料時,交易速度最快。

唯讀交易

除了鎖定讀寫交易之外,Spanner 也提供唯讀交易。

當您需要在同一個時間戳記執行超過一個讀取作業時,請使用唯讀交易。如果您可以使用 Spanner 的單一讀取方法表示讀取作業,便應改用該單一取方法。使用這種單一讀取呼叫的效能,與在唯讀交易中進行單一讀取的效能應大致相等。

如果讀取大量資料,請考慮使用分區平行讀取資料

由於唯讀交易無法寫入,這些交易無法鎖定作業,也無法禁止其他交易。唯讀交易會觀察交易修訂記錄中一致的前置字串,讓應用程式永遠取得一致的資料。

介面

Spanner 提供介面,可在唯讀交易的背景中執行作業主體,並在交易取消時重試。

範例

以下範例說明如何使用唯讀交易,在同個時間戳記的兩次讀取作業取得一致資料:

C++

void ReadOnlyTransaction(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto read_only = spanner::MakeReadOnlyTransaction();

  spanner::SqlStatement select(
      "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
  using RowType = std::tuple<std::int64_t, std::int64_t, std::string>;

  // Read#1.
  auto rows1 = client.ExecuteQuery(read_only, select);
  std::cout << "Read 1 results\n";
  for (auto& row : spanner::StreamOf<RowType>(rows1)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
  // Read#2. Even if changes occur in-between the reads the transaction ensures
  // that Read #1 and Read #2 return the same data.
  auto rows2 = client.ExecuteQuery(read_only, select);
  std::cout << "Read 2 results\n";
  for (auto& row : spanner::StreamOf<RowType>(rows2)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
}

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Transactions;

public class QueryDataWithTransactionAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<List<Album>> QueryDataWithTransactionAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>();
        using TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        using var connection = new SpannerConnection(connectionString);

        // Opens the connection so that the Spanner transaction included in the TransactionScope
        // is read-only TimestampBound.Strong.
        await connection.OpenAsync(SpannerTransactionCreationOptions.ReadOnly, options: null, cancellationToken: default);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");

        // Read #1.
        using (var reader = await cmd.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                Console.WriteLine("SingerId : " + reader.GetFieldValue<string>("SingerId")
                    + " AlbumId : " + reader.GetFieldValue<string>("AlbumId")
                    + " AlbumTitle : " + reader.GetFieldValue<string>("AlbumTitle"));
            }
        }

        // Read #2. Even if changes occur in-between the reads,
        // the transaction ensures that Read #1 and Read #2
        // return the same data.
        using (var reader = await cmd.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                albums.Add(new Album
                {
                    AlbumId = reader.GetFieldValue<int>("AlbumId"),
                    SingerId = reader.GetFieldValue<int>("SingerId"),
                    AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
                });
            }
        }
        scope.Complete();
        Console.WriteLine("Transaction complete.");
        return albums;
    }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readOnlyTransaction(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	ro := client.ReadOnlyTransaction()
	defer ro.Close()
	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := ro.Query(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}

	iter = ro.Read(ctx, "Albums", spanner.AllKeys(), []string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

Java

static void readOnlyTransaction(DatabaseClient dbClient) {
  // ReadOnlyTransaction must be closed by calling close() on it to release resources held by it.
  // We use a try-with-resource block to automatically do so.
  try (ReadOnlyTransaction transaction = dbClient.readOnlyTransaction()) {
    try (ResultSet queryResultSet =
        transaction.executeQuery(
            Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
      while (queryResultSet.next()) {
        System.out.printf(
            "%d %d %s\n",
            queryResultSet.getLong(0), queryResultSet.getLong(1), queryResultSet.getString(2));
      }
    } // queryResultSet.close() is automatically called here
    try (ResultSet readResultSet =
        transaction.read(
          "Albums", KeySet.all(), Arrays.asList("SingerId", "AlbumId", "AlbumTitle"))) {
      while (readResultSet.next()) {
        System.out.printf(
            "%d %d %s\n",
            readResultSet.getLong(0), readResultSet.getLong(1), readResultSet.getString(2));
      }
    } // readResultSet.close() is automatically called here
  } // transaction.close() is automatically called here
}

Node.js

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Gets a transaction object that captures the database state
// at a specific point in time
database.getSnapshot(async (err, transaction) => {
  if (err) {
    console.error(err);
    return;
  }
  const queryOne = 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums';

  try {
    // Read #1, using SQL
    const [qOneRows] = await transaction.run(queryOne);

    qOneRows.forEach(row => {
      const json = row.toJSON();
      console.log(
        `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`,
      );
    });

    const queryTwo = {
      columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
    };

    // Read #2, using the `read` method. Even if changes occur
    // in-between the reads, the transaction ensures that both
    // return the same data.
    const [qTwoRows] = await transaction.read('Albums', queryTwo);

    qTwoRows.forEach(row => {
      const json = row.toJSON();
      console.log(
        `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`,
      );
    });

    console.log('Successfully executed read-only transaction.');
  } catch (err) {
    console.error('ERROR:', err);
  } finally {
    transaction.end();
    // Close the database when finished.
    await database.close();
  }
});

PHP

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads data inside of a read-only transaction.
 *
 * Within the read-only transaction, or "snapshot", the application sees
 * consistent view of the database at a particular timestamp.
 * Example:
 * ```
 * read_only_transaction($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_only_transaction(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $snapshot = $database->snapshot();
    $results = $snapshot->execute(
        'SELECT SingerId, AlbumId, AlbumTitle FROM Albums'
    );
    print('Results from the first read:' . PHP_EOL);
    foreach ($results as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }

    // Perform another read using the `read` method. Even if the data
    // is updated in-between the reads, the snapshot ensures that both
    // return the same data.
    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle']
    );

    print('Results from the second read:' . PHP_EOL);
    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

def read_only_transaction(instance_id, database_id):
    """Reads data inside of a read-only transaction.

    Within the read-only transaction, or "snapshot", the application sees
    consistent view of the database at a particular timestamp.
    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot(multi_use=True) as snapshot:
        # Read using SQL.
        results = snapshot.execute_sql(
            "SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
        )

        print("Results from first read:")
        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

        # Perform another read using the `read` method. Even if the data
        # is updated in-between the reads, the snapshot ensures that both
        # return the same data.
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums", columns=("SingerId", "AlbumId", "AlbumTitle"), keyset=keyset
        )

        print("Results from second read:")
        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.snapshot do |snapshot|
  snapshot.execute("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").rows.each do |row|
    puts "#{row[:AlbumId]} #{row[:AlbumTitle]} #{row[:SingerId]}"
  end

  # Even if changes occur in-between the reads, the transaction ensures that
  # both return the same data.
  snapshot.read("Albums", [:AlbumId, :AlbumTitle, :SingerId]).rows.each do |row|
    puts "#{row[:AlbumId]} #{row[:AlbumTitle]} #{row[:SingerId]}"
  end
end

語意

本節說明唯讀交易的語意。

快照唯讀交易

在 Spanner 中執行唯讀交易時,系統會在單一邏輯時點執行所有讀取作業。也就是說,唯讀交易和任何其他並行讀取者和寫入者,都會看到該特定時間點的資料庫一致性快照。

相較於鎖定讀寫交易,這些快照唯讀交易提供更簡單的方法,確保讀取作業的一致性。原因如下:

  • 不鎖定:唯讀交易不會取得鎖定。而是選取 Spanner 時間戳記,並對該資料的歷來版本執行所有讀取作業。由於唯讀交易不會使用鎖定,因此不會封鎖並行的讀寫交易。
  • 不會中止:這類交易絕不會中止。如果所選讀取時間戳記遭到垃圾收集,這些作業可能會失敗,但 Spanner 的預設垃圾收集政策通常相當寬鬆,因此大多數應用程式不會遇到這個問題。
  • 不提交或回溯:唯讀交易不需要呼叫 sessions.commitsessions.rollback,實際上也無法呼叫。

如要執行快照交易,用戶端會定義時間戳記範圍,指示 Spanner 如何選取讀取時間戳記。時間戳記界限類型包括:

  • 強式讀取:這類讀取作業可確保您看到讀取作業開始前,所有已確認交易的效果。單一讀取作業中的所有資料列都一致。不過,強式讀取無法重複,雖然強式讀取會傳回時間戳記,但再次讀取相同時間戳記的資料則可重複。兩筆連續的強式唯讀交易若有並行的寫入作業,可能會產生不同的結果。變更串流的查詢必須使用這個界限。詳情請參閱 TransactionOptions.ReadOnly.strong
  • 精確的過時程度:這個選項會以您指定的時間戳記執行讀取作業,時間戳記可以是絕對時間戳記,也可以是相對於目前時間的過時程度。這可確保您觀察到該時間戳記之前的一致全域交易記錄前置字元,並封鎖可能以少於或等於讀取時間戳記的時間戳記認可的衝突交易。雖然比限定的過時程度模式稍快,但可能會傳回較舊的資料。詳情請參閱 TransactionOptions.ReadOnly.read_timestampTransactionOptions.ReadOnly.exact_staleness
  • 限定的過時程度:Spanner 會在使用者定義的過時程度限制內,選取最新的時間戳記,以在最近可用的備用資源執行作業,不會遭到封鎖。傳回的所有資料列都一致。與強讀取作業一樣,限定的過時程度讀取作業不可重複,因為即使使用相同的界限,不同的讀取作業也可能在不同的時間戳記執行。這類讀取作業會分兩個階段執行 (時間戳記協商,然後讀取),通常會比精確的過時程度慢一點,但通常會傳回較新的結果,且更有可能在本地副本執行。這個模式僅適用於單次使用的唯讀交易,因為時間戳記協商需要事先知道要讀取哪些資料列。詳情請參閱 TransactionOptions.ReadOnly.max_stalenessTransactionOptions.ReadOnly.min_read_timestamp

分區 DML 交易

您可以使用分區 DML 執行大規模的 UPDATEDELETE 陳述式,便不會遇到交易限制或整個資料表遭到鎖定的情況。Spanner 會將索引鍵空間分區,然後在個別的讀寫交易中,在每個分區執行 DML 陳述式。

如要使用非分區 DML,請在程式碼中明確建立的讀寫交易中執行陳述式。詳情請參閱「使用 DML」。

介面

Spanner 提供 TransactionOptions.partitionedDml 介面,用於執行單一分區 DML 陳述式。

範例

以下程式碼範例會更新 Albums 資料表的 MarketingBudget 資料欄。

C++

使用 ExecutePartitionedDml() 函式執行分區 DML 陳述式。

void DmlPartitionedUpdate(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto result = client.ExecutePartitionedDml(
      spanner::SqlStatement("UPDATE Albums SET MarketingBudget = 100000"
                            "  WHERE SingerId > 1"));
  if (!result) throw std::move(result).status();
  std::cout << "Updated at least " << result->row_count_lower_bound
            << " row(s) [spanner_dml_partitioned_update]\n";
}

C#

使用 ExecutePartitionedUpdateAsync() 方法執行分區 DML 陳述式。


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;

public class UpdateUsingPartitionedDmlCoreAsyncSample
{
    public async Task<long> UpdateUsingPartitionedDmlCoreAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var cmd = connection.CreateDmlCommand("UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1");
        long rowCount = await cmd.ExecutePartitionedUpdateAsync();

        Console.WriteLine($"{rowCount} row(s) updated...");
        return rowCount;
    }
}

Go

使用 PartitionedUpdate() 方法執行分區 DML 陳述式。


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func updateUsingPartitionedDML(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1"}
	rowCount, err := client.PartitionedUpdate(ctx, stmt)
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "%d record(s) updated.\n", rowCount)
	return nil
}

Java

使用 executePartitionedUpdate() 方法執行分區 DML 陳述式。

static void updateUsingPartitionedDml(DatabaseClient dbClient) {
  String sql = "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1";
  long rowCount = dbClient.executePartitionedUpdate(Statement.of(sql));
  System.out.printf("%d records updated.\n", rowCount);
}

Node.js

使用 runPartitionedUpdate() 方法執行分區 DML 陳述式。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

try {
  const [rowCount] = await database.runPartitionedUpdate({
    sql: 'UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1',
  });
  console.log(`Successfully updated ${rowCount} records.`);
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  database.close();
}

PHP

使用 executePartitionedUpdate() 方法執行分區 DML 陳述式。

use Google\Cloud\Spanner\SpannerClient;

/**
 * Updates sample data in the database by partition with a DML statement.
 *
 * This updates the `MarketingBudget` column which must be created before
 * running this sample. You can add the column by running the `add_column`
 * sample or by running this DDL statement against your database:
 *
 *     ALTER TABLE Albums ADD COLUMN MarketingBudget INT64
 *
 * Example:
 * ```
 * update_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function update_data_with_partitioned_dml(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $rowCount = $database->executePartitionedUpdate(
        'UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1'
    );

    printf('Updated %d row(s).' . PHP_EOL, $rowCount);
}

Python

使用 execute_partitioned_dml() 方法執行分區 DML 陳述式。

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"

spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

row_ct = database.execute_partitioned_dml(
    "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1"
)

print("{} records updated.".format(row_ct))

Ruby

使用 execute_partitioned_update() 方法執行分區 DML 陳述式。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

row_count = client.execute_partition_update(
  "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1"
)

puts "#{row_count} records updated."

以下程式碼範例會依據 SingerId 資料欄,從 Singers 資料表刪除列。

C++

void DmlPartitionedDelete(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto result = client.ExecutePartitionedDml(
      spanner::SqlStatement("DELETE FROM Singers WHERE SingerId > 10"));
  if (!result) throw std::move(result).status();
  std::cout << "Deleted at least " << result->row_count_lower_bound
            << " row(s) [spanner_dml_partitioned_delete]\n";
}

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;

public class DeleteUsingPartitionedDmlCoreAsyncSample
{
    public async Task<long> DeleteUsingPartitionedDmlCoreAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var cmd = connection.CreateDmlCommand("DELETE FROM Singers WHERE SingerId > 10");
        long rowCount = await cmd.ExecutePartitionedUpdateAsync();

        Console.WriteLine($"{rowCount} row(s) deleted...");
        return rowCount;
    }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func deleteUsingPartitionedDML(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: "DELETE FROM Singers WHERE SingerId > 10"}
	rowCount, err := client.PartitionedUpdate(ctx, stmt)
	if err != nil {
		return err

	}
	fmt.Fprintf(w, "%d record(s) deleted.", rowCount)
	return nil
}

Java

static void deleteUsingPartitionedDml(DatabaseClient dbClient) {
  String sql = "DELETE FROM Singers WHERE SingerId > 10";
  long rowCount = dbClient.executePartitionedUpdate(Statement.of(sql));
  System.out.printf("%d records deleted.\n", rowCount);
}

Node.js

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

try {
  const [rowCount] = await database.runPartitionedUpdate({
    sql: 'DELETE FROM Singers WHERE SingerId > 10',
  });
  console.log(`Successfully deleted ${rowCount} records.`);
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  database.close();
}

PHP

use Google\Cloud\Spanner\SpannerClient;

/**
 * Delete sample data in the database by partition with a DML statement.
 *
 * This updates the `MarketingBudget` column which must be created before
 * running this sample. You can add the column by running the `add_column`
 * sample or by running this DDL statement against your database:
 *
 *     ALTER TABLE Albums ADD COLUMN MarketingBudget INT64
 *
 * Example:
 * ```
 * update_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function delete_data_with_partitioned_dml(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $rowCount = $database->executePartitionedUpdate(
        'DELETE FROM Singers WHERE SingerId > 10'
    );

    printf('Deleted %d row(s).' . PHP_EOL, $rowCount);
}

Python

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

row_ct = database.execute_partitioned_dml("DELETE FROM Singers WHERE SingerId > 10")

print("{} record(s) deleted.".format(row_ct))

Ruby

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

row_count = client.execute_partition_update(
  "DELETE FROM Singers WHERE SingerId > 10"
)

puts "#{row_count} records deleted."

語意

本節說明分區 DML 的語意。

瞭解分區 DML 執行作業

無論您使用的是用戶端程式庫方法或 Google Cloud CLI,一次都只能執行一個分區 DML 陳述式。

分區交易不支援修訂或復原。 Spanner 會立即執行並套用 DML 陳述式。如果您取消作業或作業失敗,Spanner 會取消所有執行中的分區,也不會啟動其他任何剩餘的分區。不過,Spanner 不會復原任何已執行的分區。

分區 DML 鎖定獲取策略

為減少鎖定爭用情形,分區 DML 只會取得符合 WHERE 子句的資料列讀取鎖定。用於每個分割區的較小獨立交易也會在較短時間內保留鎖定。

工作階段交易限制

Spanner 中的每個工作階段一次只能有一筆有效交易。包括獨立的讀取和查詢作業,這些作業會在內部使用交易,並計入這項限制。交易完成後,工作階段可立即重複用於下一筆交易,不必為每筆交易建立新的工作階段。

舊的讀取時間戳記和版本垃圾收集

Spanner 會執行版本垃圾收集作業,收集已刪除或覆寫的資料,並回收儲存空間。根據預設,系統會回收超過一小時的資料。Spanner 無法讀取時間戳記早於設定 VERSION_RETENTION_PERIOD 的資料,預設值為一小時,但最多可設定為一週。執行期間讀取作業過舊時,作業會失敗並傳回 FAILED_PRECONDITION 錯誤。

查詢變更串流

變更串流是一種結構定義物件,可供您設定,用來監控整個資料庫、特定資料表,或資料庫中定義的一組資料欄的資料修改情形。

建立變更串流時,Spanner 會定義對應的 SQL 資料表值函式 (TVF)。您可以使用這個 TVF,透過 sessions.executeStreamingSql 方法查詢相關聯變更串流中的變更記錄。TVF 的名稱是根據變更串流的名稱產生,且一律以 READ_ 開頭。

變更串流 TVF 的所有查詢都必須使用 sessions.executeStreamingSql API,在具有強式唯讀 timestamp_bound 的一次性唯讀交易中執行。變更串流 TVF 可讓您指定時間範圍的 start_timestampend_timestamp。您可以使用這個強大的唯讀 timestamp_bound,存取保留期限內的所有變更記錄。所有其他TransactionOptions都不適用於變更串流查詢。

此外,如果 TransactionOptions.read_only.return_read_timestamp 設為 true,描述交易的 Transaction 訊息會傳回 2^63 - 2 的特殊值,而非有效的讀取時間戳記。您應捨棄這個特殊值,且不得用於任何後續查詢。

詳情請參閱「變更串流查詢工作流程」。

閒置交易

如果交易沒有未完成的讀取作業或 SQL 查詢,且過去 10 秒內未啟動任何作業,就會視為閒置。Spanner 可以中止閒置交易,防止交易無限期保留鎖定。如果閒置交易遭到中止,提交作業就會失敗,並傳回 ABORTED 錯誤。在交易中定期執行小型查詢 (例如 SELECT 1),可避免交易閒置。