目標
本教學課程將逐步引導您使用 Spanner PGAdapter 本機 Proxy 為 PostgreSQL 驅動程式進行下列步驟:
- 建立 Spanner 執行個體和資料庫。
- 對資料庫中的資料進行寫入和讀取,以及執行 SQL 查詢。
- 更新資料庫結構定義。
- 使用讀取/寫入交易來更新資料。
- 將次要索引新增至資料庫。
- 使用索引對資料執行讀取作業和 SQL 查詢。
- 使用唯讀交易擷取資料。
費用
本教學課程使用 Spanner,這是Google Cloud的計費元件。如要瞭解 Spanner 的使用費用,請參閱「定價」一文。
事前準備
完成「設定」一文中說明的步驟,包含建立與設定預設的 Google Cloud 專案、啟用計費功能、啟用 Cloud Spanner API 和設定 OAuth 2.0 以取得使用 Cloud Spanner API 的驗證憑證。
特別提醒您,請務必執行 gcloud auth
application-default login
來設定本機開發環境的驗證憑證。
準備本機 PGAdapter 環境
您可以使用 PostgreSQL 驅動程式搭配 PGAdapter,連線至 Spanner。PGAdapter 是本機 Proxy,可將 PostgreSQL 網路通訊協定轉譯為 Spanner gRPC 通訊協定。
執行 PGAdapter 需要 Java 或 Docker。
如果開發機器尚未安裝下列任一項目,請安裝以下任一項目:
將應用程式存放區範例複製到本機電腦中:
git clone https://github.com/GoogleCloudPlatform/pgadapter.git
變更為包含 Spanner 範例程式碼的目錄:
psql
cd pgadapter/samples/snippets/psql-snippets
Java
cd pgadapter/samples/snippets/java-snippets mvn package -DskipTests
Go
cd pgadapter/samples/snippets/golang-snippets
Node.js
cd pgadapter/samples/snippets/nodejs-snippets npm install
Python
cd pgadapter/samples/snippets/python-snippets python -m venv ./venv pip install -r requirements.txt cd samples
C#
cd pgadapter/samples/snippets/dotnet-snippets
PHP
cd pgadapter/samples/snippets/php-snippets composer install cd samples
建立執行個體
首次使用 Spanner 時,您必須建立執行個體,這是 Spanner 資料庫會使用的資源分配單位。建立執行個體時,請選擇「執行個體設定」以決定資料儲存的位置,再選擇要使用的節點數量以決定執行個體的服務和儲存空間資源量。
執行下列指令,在 us-central1
地區使用 1 個節點建立 Spanner 執行個體:
gcloud spanner instances create test-instance --config=regional-us-central1 \
--description="Test Instance" --nodes=1
請注意,如此將建立具備下列特性的執行個體:
- 執行個體 ID
test-instance
- 顯示名稱
Test Instance
- 執行個體設定
regional-us-central1
(地區設定會將資料儲存在一個地區,而多地區設定則會讓資料散佈在多個地區。詳情請參閱「關於執行個體」一文。 - 節點數量 1 (
node_count
與執行個體中的資料庫可用的服務和儲存空間資源數量相對應。詳情請參閱「節點和處理單元」一節)。
畫面上會顯示下列訊息:
Creating instance...done.
瀏覽範例檔案
範例存放區中有一項範例,說明如何使用 Spanner 搭配 PGAdapter。
請查看samples/snippets
資料夾,瞭解如何使用 Spanner。該檔案中的程式碼會顯示如何建立及使用新資料庫。這份資料會使用結構定義與資料模型頁面中列出的範例結構定義。
啟動 PGAdapter
在本機開發電腦上啟動 PGAdapter,並將其指向您建立的執行個體。
下列指令假設您已執行 gcloud auth application-default login
。
Java 應用程式
wget https://storage.googleapis.com/pgadapter-jar-releases/pgadapter.tar.gz \
&& tar -xzvf pgadapter.tar.gz
java -jar pgadapter.jar -i test-instance
Docker
docker pull gcr.io/cloud-spanner-pg-adapter/pgadapter
docker run \
--name pgadapter \
--rm -d -p 5432:5432 \
-v "$HOME/.config/gcloud":/gcloud:ro \
--env CLOUDSDK_CONFIG=/gcloud \
gcr.io/cloud-spanner-pg-adapter/pgadapter \
-i test-instance -x
模擬器
docker pull gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator
docker run \
--name pgadapter-emulator \
--rm -d \
-p 5432:5432 \
-p 9010:9010 \
-p 9020:9020 \
gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator
這會使用內嵌的 Spanner 模擬器啟動 PGAdapter。這個內嵌模擬器會自動建立您連線的任何 Spanner 執行個體或資料庫,無須事先手動建立。
建議您在實際作業環境中,以附加元件容器或程序內依附元件的形式執行 PGAdapter。如要進一步瞭解如何在實際環境中部署 PGAdapter,請參閱「選擇 PGAdapter 的執行方法」。
建立資料庫
gcloud spanner databases create example-db --instance=test-instance \
--database-dialect=POSTGRESQL
畫面上會顯示下列訊息:
Creating database...done.
製作表格
以下程式碼會在資料庫中建立兩個資料表。
psql
#!/bin/bash
# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Create two tables in one batch.
psql << SQL
-- Create the singers table
CREATE TABLE singers (
singer_id bigint not null primary key,
first_name character varying(1024),
last_name character varying(1024),
singer_info bytea,
full_name character varying(2048) GENERATED ALWAYS
AS (first_name || ' ' || last_name) STORED
);
-- Create the albums table. This table is interleaved in the parent table
-- "singers".
CREATE TABLE albums (
singer_id bigint not null,
album_id bigint not null,
album_title character varying(1024),
primary key (singer_id, album_id)
)
-- The 'interleave in parent' clause is a Spanner-specific extension to
-- open-source PostgreSQL.
INTERLEAVE IN PARENT singers ON DELETE CASCADE;
SQL
echo "Created Singers & Albums tables in database: [${PGDATABASE}]"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
class CreateTables {
static void createTables(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (Statement statement = connection.createStatement()) {
// Create two tables in one batch.
statement.addBatch(
"create table singers ("
+ " singer_id bigint primary key not null,"
+ " first_name varchar(1024),"
+ " last_name varchar(1024),"
+ " singer_info bytea,"
+ " full_name varchar(2048) generated always as (\n"
+ " case when first_name is null then last_name\n"
+ " when last_name is null then first_name\n"
+ " else first_name || ' ' || last_name\n"
+ " end) stored"
+ ")");
statement.addBatch(
"create table albums ("
+ " singer_id bigint not null,"
+ " album_id bigint not null,"
+ " album_title varchar,"
+ " primary key (singer_id, album_id)"
+ ") interleave in parent singers on delete cascade");
statement.executeBatch();
System.out.println("Created Singers & Albums tables in database: [" + database + "]");
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func CreateTables(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Create two tables in one batch on Spanner.
br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
{SQL: "create table singers (" +
" singer_id bigint primary key not null," +
" first_name character varying(1024)," +
" last_name character varying(1024)," +
" singer_info bytea," +
" full_name character varying(2048) generated " +
" always as (first_name || ' ' || last_name) stored" +
")"},
{SQL: "create table albums (" +
" singer_id bigint not null," +
" album_id bigint not null," +
" album_title character varying(1024)," +
" primary key (singer_id, album_id)" +
") interleave in parent singers on delete cascade"},
}})
cmd, err := br.Exec()
if err != nil {
return err
}
if cmd.String() != "CREATE" {
return fmt.Errorf("unexpected command tag: %v", cmd.String())
}
if err := br.Close(); err != nil {
return err
}
fmt.Printf("Created Singers & Albums tables in database: [%s]\n", database)
return nil
}
Node.js
import { Client } from 'pg';
async function createTables(host: string, port: number, database: string): Promise<void> {
// Connect to Spanner through PGAdapter.
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Create two tables in one batch.
await connection.query("start batch ddl");
await connection.query("create table singers (" +
" singer_id bigint primary key not null," +
" first_name character varying(1024)," +
" last_name character varying(1024)," +
" singer_info bytea," +
" full_name character varying(2048) generated " +
" always as (first_name || ' ' || last_name) stored" +
")");
await connection.query("create table albums (" +
" singer_id bigint not null," +
" album_id bigint not null," +
" album_title character varying(1024)," +
" primary key (singer_id, album_id)" +
") interleave in parent singers on delete cascade");
await connection.query("run batch");
console.log(`Created Singers & Albums tables in database: [${database}]`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def create_tables(host: string, port: int, database: string):
# Connect to Cloud Spanner using psycopg3 through PGAdapter.
with psycopg.connect("host={host} port={port} "
"dbname={database} "
"sslmode=disable".format(host=host, port=port,
database=database)) as conn:
# Enable autocommit to execute DDL statements, as psycopg otherwise
# tries to use a read/write transaction.
conn.autocommit = True
# Use a pipeline to execute multiple DDL statements in one batch.
with conn.pipeline():
conn.execute("create table singers ("
+ " singer_id bigint primary key not null,"
+ " first_name character varying(1024),"
+ " last_name character varying(1024),"
+ " singer_info bytea,"
+ " full_name character varying(2048) generated "
+ " always as (first_name || ' ' || last_name) stored"
+ ")")
conn.execute("create table albums ("
+ " singer_id bigint not null,"
+ " album_id bigint not null,"
+ " album_title character varying(1024),"
+ " primary key (singer_id, album_id)"
+ ") interleave in parent singers on delete cascade")
print("Created Singers & Albums tables in database: [{database}]"
.format(database=database))
C#
using Npgsql;
namespace dotnet_snippets;
public static class CreateTablesSample
{
public static void CreateTables(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Create two tables in one batch.
var batch = connection.CreateBatch();
batch.BatchCommands.Add(new NpgsqlBatchCommand(
"create table singers ("
+ " singer_id bigint primary key not null,"
+ " first_name varchar(1024),"
+ " last_name varchar(1024),"
+ " singer_info bytea,"
+ " full_name varchar(2048) generated always as (\n"
+ " case when first_name is null then last_name\n"
+ " when last_name is null then first_name\n"
+ " else first_name || ' ' || last_name\n"
+ " end) stored"
+ ")"));
batch.BatchCommands.Add(new NpgsqlBatchCommand(
"create table albums ("
+ " singer_id bigint not null,"
+ " album_id bigint not null,"
+ " album_title varchar,"
+ " primary key (singer_id, album_id)"
+ ") interleave in parent singers on delete cascade"));
batch.ExecuteNonQuery();
Console.WriteLine($"Created Singers & Albums tables in database: [{database}]");
}
}
PHP
function create_tables(string $host, string $port, string $database): void
{
// Connect to Spanner through PGAdapter using the PostgreSQL PDO driver.
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// Create two tables in one batch.
$connection->exec("start batch ddl");
$connection->exec("create table singers ("
." singer_id bigint primary key not null,"
." first_name character varying(1024),"
." last_name character varying(1024),"
." singer_info bytea,"
." full_name character varying(2048) generated "
." always as (first_name || ' ' || last_name) stored"
.")");
$connection->exec("create table albums ("
." singer_id bigint not null,"
." album_id bigint not null,"
." album_title character varying(1024),"
." primary key (singer_id, album_id)"
.") interleave in parent singers on delete cascade");
$connection->exec("run batch");
print("Created Singers & Albums tables in database: [{$database}]\n");
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./create_tables.sh example-db
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar createtables example-db
Go
go run sample_runner.go createtables example-db
Node.js
npm start createtables example-db
Python
python create_tables.py example-db
C#
dotnet run createtables example-db
PHP
php create_tables.php example-db
下一個步驟是將資料寫入資料庫。
建立連線
您必須先建立連線至 PGAdapter,才能讀取或寫入資料。您與 Spanner 的所有互動都必須透過Connection
。資料庫名稱會在連線字串中指定。
psql
#!/bin/bash
# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Connect to Cloud Spanner using psql through PGAdapter
# and execute a simple query.
psql -c "select 'Hello world!' as hello"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class CreateConnection {
static void createConnection(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (ResultSet resultSet =
connection.createStatement().executeQuery("select 'Hello world!' as hello")) {
while (resultSet.next()) {
System.out.printf("Greeting from Cloud Spanner PostgreSQL: %s\n", resultSet.getString(1));
}
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func CreateConnection(host string, port int, database string) error {
ctx := context.Background()
// Connect to Cloud Spanner using pgx through PGAdapter.
// 'sslmode=disable' is optional, but adding it reduces the connection time,
// as pgx will then skip first trying to create an SSL connection.
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
row := conn.QueryRow(ctx, "select 'Hello world!' as hello")
var msg string
if err := row.Scan(&msg); err != nil {
return err
}
fmt.Printf("Greeting from Cloud Spanner PostgreSQL: %s\n", msg)
return nil
}
Node.js
import { Client } from 'pg';
async function createConnection(host: string, port: number, database: string): Promise<void> {
// Connect to Spanner through PGAdapter.
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query("select 'Hello world!' as hello");
console.log(`Greeting from Cloud Spanner PostgreSQL: ${result.rows[0]['hello']}`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def create_connection(host: string, port: int, database: string):
# Connect to Cloud Spanner using psycopg3 through PGAdapter.
# 'sslmode=disable' is optional, but adding it reduces the connection time,
# as psycopg3 will then skip first trying to create an SSL connection.
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("select 'Hello world!' as hello")
print("Greeting from Cloud Spanner PostgreSQL:", cur.fetchone()[0])
C#
using Npgsql;
namespace dotnet_snippets;
public static class CreateConnectionSample
{
public static void CreateConnection(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = new NpgsqlCommand("select 'Hello World!' as hello", connection);
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
var greeting = reader.GetString(0);
Console.WriteLine($"Greeting from Cloud Spanner PostgreSQL: {greeting}");
}
}
}
PHP
function create_connection(string $host, string $port, string $database): void
{
// Connect to Spanner through PGAdapter using the PostgreSQL PDO driver.
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// Execute a query on Spanner through PGAdapter.
$statement = $connection->query("select 'Hello world!' as hello");
$rows = $statement->fetchAll();
printf("Greeting from Cloud Spanner PostgreSQL: %s\n", $rows[0][0]);
// Cleanup resources.
$rows = null;
$statement = null;
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./create_connection.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar createconnection example-db
Go
go run sample_runner.go createconnection example-db
Node.js
npm start createconnection example-db
Python
python create_connection.py example-db
C#
dotnet run createconnection example-db
PHP
php create_connection.php example-db
使用 DML 寫入資料
您可以使用資料操縱語言 (DML) 在讀寫交易中插入資料。
這些範例說明如何使用 PostgreSQL 驅動程式,在 Spanner 上執行 DML 陳述式。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql -c "INSERT INTO singers (singer_id, first_name, last_name) VALUES
(12, 'Melissa', 'Garcia'),
(13, 'Russel', 'Morales'),
(14, 'Jacqueline', 'Long'),
(15, 'Dylan', 'Shaw')"
echo "4 records inserted"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
class WriteDataWithDml {
static class Singer {
private final long singerId;
private final String firstName;
private final String lastName;
Singer(final long id, final String first, final String last) {
this.singerId = id;
this.firstName = first;
this.lastName = last;
}
}
static void writeDataWithDml(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Add 4 rows in one statement.
// JDBC always uses '?' as a parameter placeholder.
try (PreparedStatement preparedStatement =
connection.prepareStatement(
"INSERT INTO singers (singer_id, first_name, last_name) VALUES "
+ "(?, ?, ?), "
+ "(?, ?, ?), "
+ "(?, ?, ?), "
+ "(?, ?, ?)")) {
final List<Singer> singers =
Arrays.asList(
new Singer(/* SingerId= */ 12L, "Melissa", "Garcia"),
new Singer(/* SingerId= */ 13L, "Russel", "Morales"),
new Singer(/* SingerId= */ 14L, "Jacqueline", "Long"),
new Singer(/* SingerId= */ 15L, "Dylan", "Shaw"));
// Note that JDBC parameters start at index 1.
int paramIndex = 0;
for (Singer singer : singers) {
preparedStatement.setLong(++paramIndex, singer.singerId);
preparedStatement.setString(++paramIndex, singer.firstName);
preparedStatement.setString(++paramIndex, singer.lastName);
}
int updateCount = preparedStatement.executeUpdate();
System.out.printf("%d records inserted.\n", updateCount);
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func WriteDataWithDml(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
tag, err := conn.Exec(ctx,
"INSERT INTO singers (singer_id, first_name, last_name) "+
"VALUES ($1, $2, $3), ($4, $5, $6), "+
" ($7, $8, $9), ($10, $11, $12)",
12, "Melissa", "Garcia",
13, "Russel", "Morales",
14, "Jacqueline", "Long",
15, "Dylan", "Shaw")
if err != nil {
return err
}
fmt.Printf("%v records inserted\n", tag.RowsAffected())
return nil
}
Node.js
import { Client } from 'pg';
async function writeDataWithDml(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query("INSERT INTO singers (singer_id, first_name, last_name) " +
"VALUES ($1, $2, $3), ($4, $5, $6), " +
" ($7, $8, $9), ($10, $11, $12)",
[12, "Melissa", "Garcia",
13, "Russel", "Morales",
14, "Jacqueline", "Long",
15, "Dylan", "Shaw"])
console.log(`${result.rowCount} records inserted`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def write_data_with_dml(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("INSERT INTO singers (singer_id, first_name, last_name)"
" VALUES (%s, %s, %s), (%s, %s, %s), "
" (%s, %s, %s), (%s, %s, %s)",
(12, "Melissa", "Garcia",
13, "Russel", "Morales",
14, "Jacqueline", "Long",
15, "Dylan", "Shaw",))
print("%d records inserted" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class WriteDataWithDmlSample
{
readonly struct Singer
{
public Singer(long singerId, string firstName, string lastName)
{
SingerId = singerId;
FirstName = firstName;
LastName = lastName;
}
public long SingerId { get; }
public string FirstName { get; }
public string LastName { get; }
}
public static void WriteDataWithDml(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Add 4 rows in one statement.
using var cmd = new NpgsqlCommand("INSERT INTO singers (singer_id, first_name, last_name) VALUES "
+ "($1, $2, $3), "
+ "($4, $5, $6), "
+ "($7, $8, $9), "
+ "($10, $11, $12)", connection);
List<Singer> singers =
[
new Singer(/* SingerId = */ 12L, "Melissa", "Garcia"),
new Singer(/* SingerId = */ 13L, "Russel", "Morales"),
new Singer(/* SingerId = */ 14L, "Jacqueline", "Long"),
new Singer(/* SingerId = */ 15L, "Dylan", "Shaw")
];
foreach (var singer in singers)
{
cmd.Parameters.Add(new NpgsqlParameter { Value = singer.SingerId });
cmd.Parameters.Add(new NpgsqlParameter { Value = singer.FirstName });
cmd.Parameters.Add(new NpgsqlParameter { Value = singer.LastName });
}
var updateCount = cmd.ExecuteNonQuery();
Console.WriteLine($"{updateCount} records inserted.");
}
}
PHP
function write_data_with_dml(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
$sql = "INSERT INTO singers (singer_id, first_name, last_name)"
." VALUES (?, ?, ?), (?, ?, ?), "
." (?, ?, ?), (?, ?, ?)";
$statement = $connection->prepare($sql);
$statement->execute([
12, "Melissa", "Garcia",
13, "Russel", "Morales",
14, "Jacqueline", "Long",
15, "Dylan", "Shaw"
]);
printf("%d records inserted\n", $statement->rowCount());
$statement = null;
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./write_data_with_dml.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdml example-db
Go
go run sample_runner.go writeusingdml example-db
Node.js
npm start writeusingdml example-db
Python
python write_data_with_dml.py example-db
C#
dotnet run writeusingdml example-db
PHP
php write_data_with_dml.php example-db
您應會看到以下回應:
4 records inserted.
使用 DML 批次寫入資料
PGAdapter 支援執行 DML 批次。在單一批次中傳送多個 DML 陳述式,可減少對 Spanner 的往返次數,並提升應用程式效能。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Create a prepared insert statement and execute this prepared
# insert statement three times in one SQL string. The single
# SQL string with three insert statements will be executed as
# a single DML batch on Spanner.
psql -c "PREPARE insert_singer AS
INSERT INTO singers (singer_id, first_name, last_name)
VALUES (\$1, \$2, \$3)" \
-c "EXECUTE insert_singer (16, 'Sarah', 'Wilson');
EXECUTE insert_singer (17, 'Ethan', 'Miller');
EXECUTE insert_singer (18, 'Maya', 'Patel');"
echo "3 records inserted"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
class WriteDataWithDmlBatch {
static class Singer {
private final long singerId;
private final String firstName;
private final String lastName;
Singer(final long id, final String first, final String last) {
this.singerId = id;
this.firstName = first;
this.lastName = last;
}
}
static void writeDataWithDmlBatch(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Add multiple rows in one DML batch.
// JDBC always uses '?' as a parameter placeholder.
try (PreparedStatement preparedStatement =
connection.prepareStatement(
"INSERT INTO singers (singer_id, first_name, last_name) VALUES (?, ?, ?)")) {
final List<Singer> singers =
Arrays.asList(
new Singer(/* SingerId= */ 16L, "Sarah", "Wilson"),
new Singer(/* SingerId= */ 17L, "Ethan", "Miller"),
new Singer(/* SingerId= */ 18L, "Maya", "Patel"));
for (Singer singer : singers) {
// Note that JDBC parameters start at index 1.
int paramIndex = 0;
preparedStatement.setLong(++paramIndex, singer.singerId);
preparedStatement.setString(++paramIndex, singer.firstName);
preparedStatement.setString(++paramIndex, singer.lastName);
preparedStatement.addBatch();
}
int[] updateCounts = preparedStatement.executeBatch();
System.out.printf("%d records inserted.\n", Arrays.stream(updateCounts).sum());
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func WriteDataWithDmlBatch(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
sql := "INSERT INTO singers (singer_id, first_name, last_name) " +
"VALUES ($1, $2, $3)"
batch := &pgx.Batch{}
batch.Queue(sql, 16, "Sarah", "Wilson")
batch.Queue(sql, 17, "Ethan", "Miller")
batch.Queue(sql, 18, "Maya", "Patel")
br := conn.SendBatch(ctx, batch)
_, err = br.Exec()
if err := br.Close(); err != nil {
return err
}
if err != nil {
return err
}
fmt.Printf("%v records inserted\n", batch.Len())
return nil
}
Node.js
import { Client } from 'pg';
async function writeDataWithDmlBatch(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// node-postgres does not support PostgreSQL pipeline mode, so we must use the
// `start batch dml` / `run batch` statements to execute a DML batch.
const sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
await connection.query("start batch dml");
await connection.query(sql, [16, "Sarah", "Wilson"]);
await connection.query(sql, [17, "Ethan", "Miller"]);
await connection.query(sql, [18, "Maya", "Patel"]);
const result = await connection.query("run batch");
// RUN BATCH returns the update counts as an array of strings, with one element for each
// DML statement in the batch. This calculates the total number of affected rows from that array.
const updateCount = result.rows[0]["UPDATE_COUNTS"]
.map((s: string) => parseInt(s))
.reduce((c: number, current: number) => c + current, 0);
console.log(`${updateCount} records inserted`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def write_data_with_dml_batch(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.executemany("INSERT INTO singers "
"(singer_id, first_name, last_name) "
"VALUES (%s, %s, %s)",
[(16, "Sarah", "Wilson",),
(17, "Ethan", "Miller",),
(18, "Maya", "Patel",), ])
print("%d records inserted" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class WriteDataWithDmlBatchSample
{
readonly struct Singer
{
public Singer(long singerId, string firstName, string lastName)
{
SingerId = singerId;
FirstName = firstName;
LastName = lastName;
}
public long SingerId { get; }
public string FirstName { get; }
public string LastName { get; }
}
public static void WriteDataWithDmlBatch(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Add multiple rows in one DML batch.
const string sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
List<Singer> singers =
[
new Singer(/* SingerId = */ 16L, "Sarah", "Wilson"),
new Singer(/* SingerId = */ 17L, "Ethan", "Miller"),
new Singer(/* SingerId = */ 18L, "Maya", "Patel")
];
using var batch = new NpgsqlBatch(connection);
foreach (var singer in singers)
{
batch.BatchCommands.Add(new NpgsqlBatchCommand
{
CommandText = sql,
Parameters =
{
new NpgsqlParameter {Value = singer.SingerId},
new NpgsqlParameter {Value = singer.FirstName},
new NpgsqlParameter {Value = singer.LastName}
}
});
}
var updateCount = batch.ExecuteNonQuery();
Console.WriteLine($"{updateCount} records inserted.");
}
}
PHP
function write_data_with_dml_batch(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// Use START BATCH DML / RUN BATCH to run a batch of DML statements.
// Create a prepared statement for the DML that should be executed.
$sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES (?, ?, ?)";
$statement = $connection->prepare($sql);
// Start a DML batch.
$connection->exec("START BATCH DML");
$statement->execute([16, "Sarah", "Wilson"]);
$statement->execute([17, "Ethan", "Miller"]);
$statement->execute([18, "Maya", "Patel"]);
// Run the DML batch. Use the 'query(..)' method, as the update counts are returned as a row
// containing an array with the update count of each statement in the batch.
$statement = $connection->query("RUN BATCH");
$result = $statement->fetchAll();
$update_count = $result[0][0];
printf("%s records inserted\n", $update_count);
$statement = null;
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./write_data_with_dml_batch.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdmlbatch example-db
Go
go run sample_runner.go writeusingdmlbatch example-db
Node.js
npm start writeusingdmlbatch example-db
Python
python write_data_with_dml_batch.py example-db
C#
dotnet run writeusingdmlbatch example-db
PHP
php write_data_with_dml_batch.php example-db
畫面上會顯示下列訊息:
3 records inserted.
使用變異寫入資料
您也可以使用變異來插入資料。
PGAdapter 會將 PostgreSQL COPY
指令轉譯為變異。使用 COPY
是快速在 Spanner 資料庫中插入資料的最有效率方式。
根據預設,COPY
作業是不可部分完成的作業。Spanner 上的不可部分完成作業受制於提交大小限制。詳情請參閱「CRUD 限制」。
這些範例說明如何執行非原子 COPY
作業。這可讓 COPY
作業超出提交大小限制。
psql
#!/bin/bash
# Get the source directory of this script.
directory=${BASH_SOURCE%/*}/
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Copy data to Spanner from a tab-separated text file using the COPY command.
psql -c "COPY singers (singer_id, first_name, last_name) FROM STDIN" \
< "${directory}singers_data.txt"
psql -c "COPY albums FROM STDIN" \
< "${directory}albums_data.txt"
echo "Copied singers and albums"
Java
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
class WriteDataWithCopy {
static void writeDataWithCopy(String host, int port, String database)
throws SQLException, IOException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Unwrap the PostgreSQL JDBC connection interface to get access to
// a CopyManager.
PGConnection pgConnection = connection.unwrap(PGConnection.class);
CopyManager copyManager = pgConnection.getCopyAPI();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
connection
.createStatement()
.execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
long numSingers =
copyManager.copyIn(
"COPY singers (singer_id, first_name, last_name) FROM STDIN",
WriteDataWithCopy.class.getResourceAsStream("singers_data.txt"));
System.out.printf("Copied %d singers\n", numSingers);
long numAlbums =
copyManager.copyIn(
"COPY albums FROM STDIN",
WriteDataWithCopy.class.getResourceAsStream("albums_data.txt"));
System.out.printf("Copied %d albums\n", numAlbums);
}
}
}
Go
import (
"context"
"fmt"
"os"
"github.com/jackc/pgx/v5"
)
func WriteDataWithCopy(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
conn.Exec(ctx, "set spanner.autocommit_dml_mode='partitioned_non_atomic'")
file, err := os.Open("samples/singers_data.txt")
if err != nil {
return err
}
tag, err := conn.PgConn().CopyFrom(ctx, file,
"copy singers (singer_id, first_name, last_name) from stdin")
if err != nil {
return err
}
fmt.Printf("Copied %v singers\n", tag.RowsAffected())
file, err = os.Open("samples/albums_data.txt")
if err != nil {
return err
}
tag, err = conn.PgConn().CopyFrom(ctx, file,
"copy albums from stdin")
if err != nil {
return err
}
fmt.Printf("Copied %v albums\n", tag.RowsAffected())
return nil
}
Node.js
import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
import { from as copyFrom } from 'pg-copy-streams'
import path from "path";
async function writeDataWithCopy(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Copy data from a csv file to Spanner using the COPY command.
// Note that even though the command says 'from stdin', the actual input comes from a file.
const copySingersStream = copyFrom('copy singers (singer_id, first_name, last_name) from stdin');
const ingestSingersStream = connection.query(copySingersStream);
const sourceSingersStream = fs.createReadStream(path.join(__dirname, 'singers_data.txt'));
await pipeline(sourceSingersStream, ingestSingersStream);
console.log(`Copied ${copySingersStream.rowCount} singers`);
const copyAlbumsStream = copyFrom('copy albums from stdin');
const ingestAlbumsStream = connection.query(copyAlbumsStream);
const sourceAlbumsStream = fs.createReadStream(path.join(__dirname, 'albums_data.txt'));
await pipeline(sourceAlbumsStream, ingestAlbumsStream);
console.log(`Copied ${copyAlbumsStream.rowCount} albums`);
// Close the connection.
await connection.end();
}
Python
import os
import string
import psycopg
def write_data_with_copy(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
script_dir = os.path.dirname(os.path.abspath(__file__))
singers_file_path = os.path.join(script_dir, "singers_data.txt")
albums_file_path = os.path.join(script_dir, "albums_data.txt")
conn.autocommit = True
block_size = 1024
with conn.cursor() as cur:
with open(singers_file_path, "r") as f:
with cur.copy("COPY singers (singer_id, first_name, last_name) "
"FROM STDIN") as copy:
while data := f.read(block_size):
copy.write(data)
print("Copied %d singers" % cur.rowcount)
with open(albums_file_path, "r") as f:
with cur.copy("COPY albums "
"FROM STDIN") as copy:
while data := f.read(block_size):
copy.write(data)
print("Copied %d albums" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class WriteDataWithCopySample
{
public static void WriteDataWithCopy(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
using var cmd = new NpgsqlCommand("set spanner.autocommit_dml_mode='partitioned_non_atomic'", connection);
cmd.ExecuteNonQuery();
var singerCount = 0;
using var singerReader = new StreamReader("singers_data.txt");
using (var singerWriter = connection.BeginTextImport("COPY singers (singer_id, first_name, last_name) FROM STDIN"))
{
while (singerReader.ReadLine() is { } line)
{
singerWriter.WriteLine(line);
singerCount++;
}
}
Console.WriteLine($"Copied {singerCount} singers");
var albumCount = 0;
using var albumReader = new StreamReader("albums_data.txt");
using (var albumWriter = connection.BeginTextImport("COPY albums FROM STDIN"))
{
while (albumReader.ReadLine() is { } line)
{
albumWriter.WriteLine(line);
albumCount++;
}
}
Console.WriteLine($"Copied {albumCount} albums");
}
}
PHP
function write_data_with_copy(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
$dir = dirname(__FILE__);
$connection->pgsqlCopyFromFile(
"singers",
sprintf("%s/singers_data.txt", $dir),
"\t",
"\\\\N",
"singer_id, first_name, last_name");
print("Copied 5 singers\n");
$connection->pgsqlCopyFromFile(
"albums",
sprintf("%s/albums_data.txt", $dir));
print("Copied 5 albums\n");
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./write_data_with_copy.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar write example-db
Go
go run sample_runner.go write example-db
Node.js
npm start write example-db
Python
python write_data_with_copy.py example-db
C#
dotnet run write example-db
PHP
php write_data_with_copy.php example-db
畫面上會顯示下列訊息:
Copied 5 singers
Copied 5 albums
使用 SQL 查詢資料
Spanner 支援可用於讀取資料的 SQL 介面。您可以透過 Google Cloud CLI 在指令列上存取這個介面,也可以透過程式輔助方式使用 PostgreSQL 驅動程式存取這個介面。
使用指令列
執行下列 SQL 陳述式,從 Albums
資料表讀取所有資料欄的值:
gcloud spanner databases execute-sql example-db --instance=test-instance \
--sql='SELECT singer_id, album_id, album_title FROM albums'
結果應為:
SingerId AlbumId AlbumTitle
1 1 Total Junk
1 2 Go, Go, Go
2 1 Green
2 2 Forever Hold Your Peace
2 3 Terrified
使用 PostgreSQL 驅動程式
除了在指令列上執行 SQL 陳述式之外,您也可以使用 PostgreSQL 驅動程式,透過程式輔助方式發出相同的 SQL 陳述式。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql -c "SELECT singer_id, album_id, album_title
FROM albums"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class QueryData {
static void queryData(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery("SELECT singer_id, album_id, album_title FROM albums")) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n",
resultSet.getLong("singer_id"),
resultSet.getLong("album_id"),
resultSet.getString("album_title"));
}
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func QueryData(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
rows, err := conn.Query(ctx, "SELECT singer_id, album_id, album_title "+
"FROM albums")
defer rows.Close()
if err != nil {
return err
}
for rows.Next() {
var singerId, albumId int64
var title string
err = rows.Scan(&singerId, &albumId, &title)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, albumId, title)
}
return rows.Err()
}
Node.js
import { Client } from 'pg';
async function queryData(host: string, port: number, database: string): Promise<void> {
// Connect to Spanner through PGAdapter.
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query("SELECT singer_id, album_id, album_title " +
"FROM albums");
for (const row of result.rows) {
console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
}
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def query_data(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT singer_id, album_id, album_title "
"FROM albums")
for album in cur:
print(album)
C#
using Npgsql;
namespace dotnet_snippets;
public static class QueryDataSample
{
public static void QueryData(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, album_title FROM albums", connection);
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
Console.WriteLine($"{reader.GetInt64(0)} {reader.GetInt64(1)} {reader.GetString(2)}");
}
}
}
PHP
function query_data(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
$statement = $connection->query("SELECT singer_id, album_id, album_title "
."FROM albums "
."ORDER BY singer_id, album_id"
);
$rows = $statement->fetchAll();
foreach ($rows as $album)
{
printf("%s\t%s\t%s\n", $album["singer_id"], $album["album_id"], $album["album_title"]);
}
$rows = null;
$statement = null;
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./query_data.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar query example-db
Go
go run sample_runner.go query example-db
Node.js
npm start query example-db
Python
python query_data.py example-db
C#
dotnet run query example-db
PHP
php query_data.php example-db
畫面上應會顯示下列結果:
1 1 Total Junk
1 2 Go, Go, Go
2 1 Green
2 2 Forever Hold Your Peace
2 3 Terrified
使用 SQL 參數執行查詢
如果應用程式有經常執行的查詢,您可以透過參數化來提升效能。系統可快取並重新使用產生的參數查詢,減少編譯的成本。詳情請參閱「使用查詢參數,針對經常執行的查詢加快速度」。
以下範例說明如何在 WHERE
子句中使用參數,查詢包含 LastName
特定值的記錄。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Create a prepared statement to use a query parameter.
# Using a prepared statement for executing the same SQL string multiple
# times increases the execution speed of the statement.
psql -c "PREPARE select_singer AS
SELECT singer_id, first_name, last_name
FROM singers
WHERE last_name = \$1" \
-c "EXECUTE select_singer ('Garcia')"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
class QueryDataWithParameter {
static void queryDataWithParameter(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (PreparedStatement statement =
connection.prepareStatement(
"SELECT singer_id, first_name, last_name "
+ "FROM singers "
+ "WHERE last_name = ?")) {
statement.setString(1, "Garcia");
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
System.out.printf(
"%d %s %s\n",
resultSet.getLong("singer_id"),
resultSet.getString("first_name"),
resultSet.getString("last_name"));
}
}
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func QueryDataWithParameter(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
rows, err := conn.Query(ctx,
"SELECT singer_id, first_name, last_name "+
"FROM singers "+
"WHERE last_name = $1", "Garcia")
defer rows.Close()
if err != nil {
return err
}
for rows.Next() {
var singerId int64
var firstName, lastName string
err = rows.Scan(&singerId, &firstName, &lastName)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, firstName, lastName)
}
return rows.Err()
}
Node.js
import { Client } from 'pg';
async function queryWithParameter(host: string, port: number, database: string): Promise<void> {
// Connect to Spanner through PGAdapter.
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query(
"SELECT singer_id, first_name, last_name " +
"FROM singers " +
"WHERE last_name = $1", ["Garcia"]);
for (const row of result.rows) {
console.log(`${row["singer_id"]} ${row["first_name"]} ${row["last_name"]}`);
}
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def query_data_with_parameter(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT singer_id, first_name, last_name "
"FROM singers "
"WHERE last_name = %s", ("Garcia",))
for singer in cur:
print(singer)
C#
using Npgsql;
namespace dotnet_snippets;
public static class QueryDataWithParameterSample
{
public static void QueryDataWithParameter(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = new NpgsqlCommand("SELECT singer_id, first_name, last_name "
+ "FROM singers "
+ "WHERE last_name = $1", connection);
cmd.Parameters.Add(new NpgsqlParameter { Value = "Garcia" });
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["first_name"]} {reader["last_name"]}");
}
}
}
PHP
function query_data_with_parameter(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
$statement = $connection->prepare("SELECT singer_id, first_name, last_name "
."FROM singers "
."WHERE last_name = ?"
);
$statement->execute(["Garcia"]);
$rows = $statement->fetchAll();
foreach ($rows as $singer)
{
printf("%s\t%s\t%s\n", $singer["singer_id"], $singer["first_name"], $singer["last_name"]);
}
$rows = null;
$statement = null;
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./query_data_with_parameter.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar querywithparameter example-db
Go
go run sample_runner.go querywithparameter example-db
Node.js
npm start querywithparameter example-db
Python
python query_data_with_parameter.py example-db
C#
dotnet run querywithparameter example-db
PHP
php query_data_with_parameter.php example-db
畫面上應會顯示下列結果:
12 Melissa Garcia
更新資料庫結構定義
假設您需要新增名稱為 MarketingBudget
的新資料欄到 Albums
資料表,必須先更新資料庫結構定義,才能新增新資料欄到現有的資料表。Spanner 可在資料庫持續處理流量時,支援資料庫的結構定義更新作業。結構定義更新作業不需要讓資料庫離線,也不會鎖定整個資料表或資料欄;您可以在結構定義更新期間持續將資料寫入資料庫。詳情請參閱進行結構定義更新一文中支援的結構定義更新和結構定義變更效能。
新增資料欄
您可以使用 Google Cloud CLI 在指令列上新增資料欄,或是透過程式輔助方式使用 PostgreSQL 驅動程式新增資料欄。
使用指令列
使用下列 ALTER TABLE
指令,在資料表中新增資料欄:
gcloud spanner databases ddl update example-db --instance=test-instance \
--ddl='ALTER TABLE albums ADD COLUMN marketing_budget BIGINT'
畫面上會顯示下列訊息:
Schema updating...done.
使用 PostgreSQL 驅動程式
使用 PostgreSQL 驅動程式執行 DDL 陳述式,修改結構定義:
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql -c "ALTER TABLE albums ADD COLUMN marketing_budget bigint"
echo "Added marketing_budget column"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
class AddColumn {
static void addColumn(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
connection.createStatement().execute("alter table albums add column marketing_budget bigint");
System.out.println("Added marketing_budget column");
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func AddColumn(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
_, err = conn.Exec(ctx,
"ALTER TABLE albums "+
"ADD COLUMN marketing_budget bigint")
if err != nil {
return err
}
fmt.Println("Added marketing_budget column")
return nil
}
Node.js
import { Client } from 'pg';
async function addColumn(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
await connection.query(
"ALTER TABLE albums " +
"ADD COLUMN marketing_budget bigint");
console.log("Added marketing_budget column");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def add_column(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
# DDL can only be executed when autocommit=True.
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("ALTER TABLE albums "
"ADD COLUMN marketing_budget bigint")
print("Added marketing_budget column")
C#
using Npgsql;
namespace dotnet_snippets;
public static class AddColumnSample
{
public static void AddColumn(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = connection.CreateCommand();
cmd.CommandText = "alter table albums add column marketing_budget bigint";
cmd.ExecuteNonQuery();
Console.WriteLine("Added marketing_budget column");
}
}
PHP
function add_column(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
$connection->exec("ALTER TABLE albums ADD COLUMN marketing_budget bigint");
print("Added marketing_budget column\n");
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./add_column.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar addmarketingbudget example-db
Go
go run sample_runner.go addmarketingbudget example-db
Node.js
npm start addmarketingbudget example-db
Python
python add_column.py example-db
C#
dotnet run addmarketingbudget example-db
PHP
php add_column.php example-db
畫面上會顯示下列訊息:
Added marketing_budget column
執行 DDL 批次
建議一次執行多項結構定義修改作業。您可以使用 PostgreSQL 驅動程式的內建批次功能,以分號分隔的 SQL 字串提交所有 DDL 陳述式,或使用 START BATCH DDL
和 RUN BATCH
陳述式,在單一批次中執行多個 DDL 陳述式。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Use a single SQL command to batch multiple statements together.
# Executing multiple DDL statements as one batch is more efficient
# than executing each statement individually.
# Separate the statements with semicolons.
psql << SQL
CREATE TABLE venues (
venue_id bigint not null primary key,
name varchar(1024),
description jsonb
);
CREATE TABLE concerts (
concert_id bigint not null primary key ,
venue_id bigint not null,
singer_id bigint not null,
start_time timestamptz,
end_time timestamptz,
constraint fk_concerts_venues foreign key
(venue_id) references venues (venue_id),
constraint fk_concerts_singers foreign key
(singer_id) references singers (singer_id)
);
SQL
echo "Added venues and concerts tables"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
class DdlBatch {
static void ddlBatch(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (Statement statement = connection.createStatement()) {
// Create two new tables in one batch.
statement.addBatch(
"CREATE TABLE venues ("
+ " venue_id bigint not null primary key,"
+ " name varchar(1024),"
+ " description jsonb"
+ ")");
statement.addBatch(
"CREATE TABLE concerts ("
+ " concert_id bigint not null primary key ,"
+ " venue_id bigint not null,"
+ " singer_id bigint not null,"
+ " start_time timestamptz,"
+ " end_time timestamptz,"
+ " constraint fk_concerts_venues foreign key"
+ " (venue_id) references venues (venue_id),"
+ " constraint fk_concerts_singers foreign key"
+ " (singer_id) references singers (singer_id)"
+ ")");
statement.executeBatch();
}
System.out.println("Added venues and concerts tables");
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func DdlBatch(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Executing multiple DDL statements as one batch is
// more efficient than executing each statement
// individually.
br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
{SQL: "CREATE TABLE venues (" +
" venue_id bigint not null primary key," +
" name varchar(1024)," +
" description jsonb" +
")"},
{SQL: "CREATE TABLE concerts (" +
" concert_id bigint not null primary key ," +
" venue_id bigint not null," +
" singer_id bigint not null," +
" start_time timestamptz," +
" end_time timestamptz," +
" constraint fk_concerts_venues foreign key" +
" (venue_id) references venues (venue_id)," +
" constraint fk_concerts_singers foreign key" +
" (singer_id) references singers (singer_id)" +
")"},
}})
if _, err := br.Exec(); err != nil {
return err
}
if err := br.Close(); err != nil {
return err
}
fmt.Println("Added venues and concerts tables")
return nil
}
Node.js
import { Client } from 'pg';
async function ddlBatch(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Executing multiple DDL statements as one batch is
// more efficient than executing each statement
// individually.
await connection.query("start batch ddl");
await connection.query("CREATE TABLE venues (" +
" venue_id bigint not null primary key," +
" name varchar(1024)," +
" description jsonb" +
")");
await connection.query("CREATE TABLE concerts (" +
" concert_id bigint not null primary key ," +
" venue_id bigint not null," +
" singer_id bigint not null," +
" start_time timestamptz," +
" end_time timestamptz," +
" constraint fk_concerts_venues foreign key" +
" (venue_id) references venues (venue_id)," +
" constraint fk_concerts_singers foreign key" +
" (singer_id) references singers (singer_id)" +
")");
await connection.query("run batch");
console.log("Added venues and concerts tables");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def ddl_batch(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
# DDL can only be executed when autocommit=True.
conn.autocommit = True
# Use a pipeline to batch multiple statements together.
# Executing multiple DDL statements as one batch is
# more efficient than executing each statement
# individually.
with conn.pipeline():
# The following statements are buffered on PGAdapter
# until the pipeline ends.
conn.execute("CREATE TABLE venues ("
" venue_id bigint not null primary key,"
" name varchar(1024),"
" description jsonb"
")")
conn.execute("CREATE TABLE concerts ("
" concert_id bigint not null primary key ,"
" venue_id bigint not null,"
" singer_id bigint not null,"
" start_time timestamptz,"
" end_time timestamptz,"
" constraint fk_concerts_venues foreign key"
" (venue_id) references venues (venue_id),"
" constraint fk_concerts_singers foreign key"
" (singer_id) references singers (singer_id)"
")")
print("Added venues and concerts tables")
C#
using Npgsql;
namespace dotnet_snippets;
public static class DdlBatchSample
{
public static void DdlBatch(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Create two new tables in one batch.
var batch = connection.CreateBatch();
batch.BatchCommands.Add(new NpgsqlBatchCommand(
"CREATE TABLE venues ("
+ " venue_id bigint not null primary key,"
+ " name varchar(1024),"
+ " description jsonb"
+ ")"));
batch.BatchCommands.Add(new NpgsqlBatchCommand(
"CREATE TABLE concerts ("
+ " concert_id bigint not null primary key ,"
+ " venue_id bigint not null,"
+ " singer_id bigint not null,"
+ " start_time timestamptz,"
+ " end_time timestamptz,"
+ " constraint fk_concerts_venues foreign key"
+ " (venue_id) references venues (venue_id),"
+ " constraint fk_concerts_singers foreign key"
+ " (singer_id) references singers (singer_id)"
+ ")"));
batch.ExecuteNonQuery();
Console.WriteLine("Added venues and concerts tables");
}
}
PHP
function ddl_batch(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// Executing multiple DDL statements as one batch is
// more efficient than executing each statement
// individually.
$connection->exec("start batch ddl");
$connection->exec("CREATE TABLE venues ("
." venue_id bigint not null primary key,"
." name varchar(1024),"
." description jsonb"
.")");
$connection->exec("CREATE TABLE concerts ("
." concert_id bigint not null primary key ,"
." venue_id bigint not null,"
." singer_id bigint not null,"
." start_time timestamptz,"
." end_time timestamptz,"
." constraint fk_concerts_venues foreign key"
." (venue_id) references venues (venue_id),"
." constraint fk_concerts_singers foreign key"
." (singer_id) references singers (singer_id)"
.")");
$connection->exec("run batch");
print("Added venues and concerts tables\n");
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./ddl_batch.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar ddlbatch example-db
Go
go run sample_runner.go ddlbatch example-db
Node.js
npm start ddlbatch example-db
Python
python ddl_batch.py example-db
C#
dotnet run ddlbatch example-db
PHP
php ddl_batch.php example-db
畫面上會顯示下列訊息:
Added venues and concerts tables
寫入資料到新資料欄
以下程式碼會將資料寫入新資料欄,並在 Albums(1, 1)
和 Albums(2, 2)
這兩個索引鍵表示的資料列中將 MarketingBudget
分別設為 100000
和 500000
。
COPY
指令轉譯為變異。COPY
指令預設會轉換為 Insert
變異。執行 set spanner.copy_upsert=true
以將 COPY
指令轉譯為 InsertOrUpdate
變異。這可用於更新 Spanner 中的現有資料。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Instruct PGAdapter to use insert-or-update for COPY statements.
# This enables us to use COPY to update data.
psql -c "set spanner.copy_upsert=true" \
-c "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN
WITH (DELIMITER ';')" \
<< DATA
1;1;100000
2;2;500000
DATA
echo "Copied albums using upsert"
Java
import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
class UpdateDataWithCopy {
static void updateDataWithCopy(String host, int port, String database)
throws SQLException, IOException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Unwrap the PostgreSQL JDBC connection interface to get access to
// a CopyManager.
PGConnection pgConnection = connection.unwrap(PGConnection.class);
CopyManager copyManager = pgConnection.getCopyAPI();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
connection
.createStatement()
.execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Instruct PGAdapter to use insert-or-update for COPY statements.
// This enables us to use COPY to update existing data.
connection.createStatement().execute("set spanner.copy_upsert=true");
// COPY uses mutations to insert or update existing data in Spanner.
long numAlbums =
copyManager.copyIn(
"COPY albums (singer_id, album_id, marketing_budget) FROM STDIN",
new StringReader("1\t1\t100000\n" + "2\t2\t500000\n"));
System.out.printf("Updated %d albums\n", numAlbums);
}
}
}
Go
import (
"context"
"fmt"
"io"
"github.com/jackc/pgx/v5"
)
func UpdateDataWithCopy(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Enable non-atomic mode. This makes the COPY operation non-atomic,
// and allows it to exceed the Spanner mutation limit.
if _, err := conn.Exec(ctx,
"set spanner.autocommit_dml_mode='partitioned_non_atomic'"); err != nil {
return err
}
// Instruct PGAdapter to use insert-or-update for COPY statements.
// This enables us to use COPY to update data.
if _, err := conn.Exec(ctx, "set spanner.copy_upsert=true"); err != nil {
return err
}
// Create a pipe that can be used to write the data manually that we want to copy.
reader, writer := io.Pipe()
// Write the data to the pipe using a separate goroutine. This allows us to stream the data
// to the COPY operation row-by-row.
go func() error {
for _, record := range []string{"1\t1\t100000\n", "2\t2\t500000\n"} {
if _, err := writer.Write([]byte(record)); err != nil {
return err
}
}
if err := writer.Close(); err != nil {
return err
}
return nil
}()
tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN")
if err != nil {
return err
}
fmt.Printf("Updated %v albums\n", tag.RowsAffected())
return nil
}
Node.js
import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import { from as copyFrom } from 'pg-copy-streams'
import {Readable} from "stream";
async function updateDataWithCopy(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Instruct PGAdapter to use insert-or-update for COPY statements.
// This enables us to use COPY to update existing data.
await connection.query("set spanner.copy_upsert=true");
// Copy data to Spanner using the COPY command.
const copyStream = copyFrom('COPY albums (singer_id, album_id, marketing_budget) FROM STDIN');
const ingestStream = connection.query(copyStream);
// Create a source stream and attach the source to the destination.
const sourceStream = new Readable();
const operation = pipeline(sourceStream, ingestStream);
// Manually push data to the source stream to write data to Spanner.
sourceStream.push("1\t1\t100000\n");
sourceStream.push("2\t2\t500000\n");
// Push a 'null' to indicate the end of the stream.
sourceStream.push(null);
// Wait for the copy operation to finish.
await operation;
console.log(`Updated ${copyStream.rowCount} albums`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def update_data_with_copy(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
# Instruct PGAdapter to use insert-or-update for COPY statements.
# This enables us to use COPY to update data.
cur.execute("set spanner.copy_upsert=true")
# COPY uses mutations to insert or update existing data in Spanner.
with cur.copy("COPY albums (singer_id, album_id, marketing_budget) "
"FROM STDIN") as copy:
copy.write_row((1, 1, 100000))
copy.write_row((2, 2, 500000))
print("Updated %d albums" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class UpdateDataWithCopySample
{
public static void UpdateDataWithCopy(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
using var cmd = connection.CreateCommand();
cmd.CommandText = "set spanner.autocommit_dml_mode='partitioned_non_atomic'";
cmd.ExecuteNonQuery();
// Instruct PGAdapter to use insert-or-update for COPY statements.
// This enables us to use COPY to update existing data.
cmd.CommandText = "set spanner.copy_upsert=true";
cmd.ExecuteNonQuery();
// COPY uses mutations to insert or update existing data in Spanner.
using (var albumWriter = connection.BeginTextImport(
"COPY albums (singer_id, album_id, marketing_budget) FROM STDIN"))
{
albumWriter.WriteLine("1\t1\t100000");
albumWriter.WriteLine("2\t2\t500000");
}
Console.WriteLine($"Updated 2 albums");
}
}
PHP
function update_data_with_copy(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// Instruct PGAdapter to use insert-or-update for COPY statements.
// This enables us to use COPY to update data.
$connection->exec("set spanner.copy_upsert=true");
// COPY uses mutations to insert or update existing data in Spanner.
$connection->pgsqlCopyFromArray(
"albums",
["1\t1\t100000", "2\t2\t500000"],
"\t",
"\\\\N",
"singer_id, album_id, marketing_budget",
);
print("Updated 2 albums\n");
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./update_data_with_copy.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar update example-db
Go
go run sample_runner.go update example-db
Node.js
npm start update example-db
Python
python update_data_with_copy.py example-db
C#
dotnet run update example-db
PHP
php update_data_with_copy.php example-db
畫面上會顯示下列訊息:
Updated 2 albums
您也可以執行 SQL 查詢,藉此擷取剛寫入的值。
以下是執行查詢的程式碼:
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql -c "SELECT singer_id, album_id, marketing_budget
FROM albums
ORDER BY singer_id, album_id"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class QueryDataWithNewColumn {
static void queryDataWithNewColumn(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(
"SELECT singer_id, album_id, marketing_budget "
+ "FROM albums "
+ "ORDER BY singer_id, album_id")) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n",
resultSet.getLong("singer_id"),
resultSet.getLong("album_id"),
resultSet.getString("marketing_budget"));
}
}
}
}
}
Go
import (
"context"
"database/sql"
"fmt"
"github.com/jackc/pgx/v5"
)
func QueryDataWithNewColumn(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
rows, err := conn.Query(ctx, "SELECT singer_id, album_id, marketing_budget "+
"FROM albums "+
"ORDER BY singer_id, album_id")
defer rows.Close()
if err != nil {
return err
}
for rows.Next() {
var singerId, albumId int64
var marketingBudget sql.NullString
err = rows.Scan(&singerId, &albumId, &marketingBudget)
if err != nil {
return err
}
var budget string
if marketingBudget.Valid {
budget = marketingBudget.String
} else {
budget = "NULL"
}
fmt.Printf("%v %v %v\n", singerId, albumId, budget)
}
return rows.Err()
}
Node.js
import { Client } from 'pg';
async function queryDataWithNewColumn(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query(
"SELECT singer_id, album_id, marketing_budget "
+ "FROM albums "
+ "ORDER BY singer_id, album_id"
);
for (const row of result.rows) {
console.log(`${row["singer_id"]} ${row["album_id"]} ${row["marketing_budget"]}`);
}
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def query_data_with_new_column(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT singer_id, album_id, marketing_budget "
"FROM albums "
"ORDER BY singer_id, album_id")
for album in cur:
print(album)
C#
using Npgsql;
namespace dotnet_snippets;
public static class QueryDataWithNewColumnSample
{
public static void QueryWithNewColumnData(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, marketing_budget "
+ "FROM albums "
+ "ORDER BY singer_id, album_id", connection);
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["marketing_budget"]}");
}
}
}
PHP
function query_data_with_new_column(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
$statement = $connection->query(
"SELECT singer_id, album_id, marketing_budget "
."FROM albums "
."ORDER BY singer_id, album_id"
);
$rows = $statement->fetchAll();
foreach ($rows as $album)
{
printf("%s\t%s\t%s\n", $album["singer_id"], $album["album_id"], $album["marketing_budget"]);
}
$rows = null;
$statement = null;
$connection = null;
}
請使用下列指令執行查詢:
psql
PGDATABASE=example-db ./query_data_with_new_column.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar querymarketingbudget example-db
Go
go run sample_runner.go querymarketingbudget example-db
Node.js
npm start querymarketingbudget example-db
Python
python query_data_with_new_column.py example-db
C#
dotnet run querymarketingbudget example-db
PHP
php query_data_with_new_column.php example-db
畫面上會顯示下列訊息:
1 1 100000
1 2 null
2 1 null
2 2 500000
2 3 null
更新資料
您可以在讀寫交易中使用 DML 來更新資料。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql << SQL
-- Transfer marketing budget from one album to another.
-- We do it in a transaction to ensure that the transfer is atomic.
-- Begin a read/write transaction.
begin;
-- Increase the marketing budget of album 1 if album 2 has enough budget.
-- The condition that album 2 has enough budget is guaranteed for the
-- duration of the transaction, as read/write transactions in Spanner use
-- external consistency as the default isolation level.
update albums set
marketing_budget = marketing_budget + 200000
where singer_id = 1
and album_id = 1
and exists (
select album_id
from albums
where singer_id = 2
and album_id = 2
and marketing_budget > 200000
);
-- Decrease the marketing budget of album 2.
update albums set
marketing_budget = marketing_budget - 200000
where singer_id = 2
and album_id = 2
and marketing_budget > 200000;
-- Commit the transaction to make the changes to both marketing budgets
-- durably stored in the database and visible to other transactions.
commit;
SQL
echo "Transferred marketing budget from Album 2 to Album 1"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
class UpdateDataWithTransaction {
static void writeWithTransactionUsingDml(String host, int port, String database)
throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Set AutoCommit=false to enable transactions.
connection.setAutoCommit(false);
// Transfer marketing budget from one album to another. We do it in a
// transaction to ensure that the transfer is atomic. There is no need
// to explicitly start the transaction. The first statement on the
// connection will start a transaction when AutoCommit=false.
String selectMarketingBudgetSql =
"SELECT marketing_budget from albums WHERE singer_id = ? and album_id = ?";
long album2Budget = 0;
try (PreparedStatement selectMarketingBudgetStatement =
connection.prepareStatement(selectMarketingBudgetSql)) {
// Bind the query parameters to SingerId=2 and AlbumId=2.
selectMarketingBudgetStatement.setLong(1, 2);
selectMarketingBudgetStatement.setLong(2, 2);
try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
while (resultSet.next()) {
album2Budget = resultSet.getLong("marketing_budget");
}
}
// The transaction will only be committed if this condition still holds
// at the time of commit. Otherwise, the transaction will be aborted.
final long transfer = 200000;
if (album2Budget >= transfer) {
long album1Budget = 0;
// Re-use the existing PreparedStatement for selecting the
// marketing_budget to get the budget for Album 1.
// Bind the query parameters to SingerId=1 and AlbumId=1.
selectMarketingBudgetStatement.setLong(1, 1);
selectMarketingBudgetStatement.setLong(2, 1);
try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
while (resultSet.next()) {
album1Budget = resultSet.getLong("marketing_budget");
}
}
// Transfer part of the marketing budget of Album 2 to Album 1.
album1Budget += transfer;
album2Budget -= transfer;
String updateSql =
"UPDATE albums "
+ "SET marketing_budget = ? "
+ "WHERE singer_id = ? and album_id = ?";
try (PreparedStatement updateStatement = connection.prepareStatement(updateSql)) {
// Update Album 1.
int paramIndex = 0;
updateStatement.setLong(++paramIndex, album1Budget);
updateStatement.setLong(++paramIndex, 1);
updateStatement.setLong(++paramIndex, 1);
// Create a DML batch by calling addBatch
// on the current PreparedStatement.
updateStatement.addBatch();
// Update Album 2 in the same DML batch.
paramIndex = 0;
updateStatement.setLong(++paramIndex, album2Budget);
updateStatement.setLong(++paramIndex, 2);
updateStatement.setLong(++paramIndex, 2);
updateStatement.addBatch();
// Execute both DML statements in one batch.
updateStatement.executeBatch();
}
}
}
// Commit the current transaction.
connection.commit();
System.out.println("Transferred marketing budget from Album 2 to Album 1");
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func WriteWithTransactionUsingDml(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Transfer marketing budget from one album to another. We do it in a
// transaction to ensure that the transfer is atomic.
tx, err := conn.Begin(ctx)
if err != nil {
return err
}
const selectSql = "SELECT marketing_budget " +
"from albums " +
"WHERE singer_id = $1 and album_id = $2"
// Get the marketing_budget of singer 2 / album 2.
row := tx.QueryRow(ctx, selectSql, 2, 2)
var budget2 int64
if err := row.Scan(&budget2); err != nil {
tx.Rollback(ctx)
return err
}
const transfer = 20000
// The transaction will only be committed if this condition still holds
// at the time of commit. Otherwise, the transaction will be aborted.
if budget2 >= transfer {
// Get the marketing_budget of singer 1 / album 1.
row := tx.QueryRow(ctx, selectSql, 1, 1)
var budget1 int64
if err := row.Scan(&budget1); err != nil {
tx.Rollback(ctx)
return err
}
// Transfer part of the marketing budget of Album 2 to Album 1.
budget1 += transfer
budget2 -= transfer
const updateSql = "UPDATE albums " +
"SET marketing_budget = $1 " +
"WHERE singer_id = $2 and album_id = $3"
// Start a DML batch and execute it as part of the current transaction.
batch := &pgx.Batch{}
batch.Queue(updateSql, budget1, 1, 1)
batch.Queue(updateSql, budget2, 2, 2)
br := tx.SendBatch(ctx, batch)
_, err = br.Exec()
if err := br.Close(); err != nil {
tx.Rollback(ctx)
return err
}
}
// Commit the current transaction.
tx.Commit(ctx)
fmt.Println("Transferred marketing budget from Album 2 to Album 1")
return nil
}
Node.js
import { Client } from 'pg';
async function writeWithTransactionUsingDml(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Transfer marketing budget from one album to another. We do it in a
// transaction to ensure that the transfer is atomic. node-postgres
// requires you to explicitly start the transaction by executing 'begin'.
await connection.query("begin");
const selectMarketingBudgetSql = "SELECT marketing_budget " +
"from albums " +
"WHERE singer_id = $1 and album_id = $2";
// Get the marketing_budget of singer 2 / album 2.
const album2BudgetResult = await connection.query(selectMarketingBudgetSql, [2, 2]);
let album2Budget = album2BudgetResult.rows[0]["marketing_budget"];
const transfer = 200000;
// The transaction will only be committed if this condition still holds
// at the time of commit. Otherwise, the transaction will be aborted.
if (album2Budget >= transfer) {
// Get the marketing budget of singer 1 / album 1.
const album1BudgetResult = await connection.query(selectMarketingBudgetSql, [1, 1]);
let album1Budget = album1BudgetResult.rows[0]["marketing_budget"];
// Transfer part of the marketing budget of Album 2 to Album 1.
album1Budget += transfer;
album2Budget -= transfer;
const updateSql = "UPDATE albums " +
"SET marketing_budget = $1 " +
"WHERE singer_id = $2 and album_id = $3";
// Start a DML batch. This batch will become part of the current transaction.
// TODO: Enable when https://github.com/googleapis/java-spanner/pull/3114 has been merged
// await connection.query("start batch dml");
// Update the marketing budget of both albums.
await connection.query(updateSql, [album1Budget, 1, 1]);
await connection.query(updateSql, [album2Budget, 2, 2]);
// TODO: Enable when https://github.com/googleapis/java-spanner/pull/3114 has been merged
// await connection.query("run batch");
}
// Commit the current transaction.
await connection.query("commit");
console.log("Transferred marketing budget from Album 2 to Album 1");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def update_data_with_transaction(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
# Set autocommit=False to use transactions.
# The first statement that is executed starts the transaction.
conn.autocommit = False
with conn.cursor() as cur:
# Transfer marketing budget from one album to another.
# We do it in a transaction to ensure that the transfer is atomic.
# There is no need to explicitly start the transaction. The first
# statement on the connection will start a transaction when
# AutoCommit=false.
select_marketing_budget_sql = ("SELECT marketing_budget "
"from albums "
"WHERE singer_id = %s "
"and album_id = %s")
# Get the marketing budget of Album #2.
cur.execute(select_marketing_budget_sql, (2, 2))
album2_budget = cur.fetchone()[0]
transfer = 200000
if album2_budget > transfer:
# Get the marketing budget of Album #1.
cur.execute(select_marketing_budget_sql, (1, 1))
album1_budget = cur.fetchone()[0]
# Transfer the marketing budgets and write the update back
# to the database.
album1_budget += transfer
album2_budget -= transfer
update_sql = ("update albums "
"set marketing_budget = %s "
"where singer_id = %s "
"and album_id = %s")
# Use a pipeline to execute two DML statements in one batch.
with conn.pipeline():
cur.execute(update_sql, (album1_budget, 1, 1,))
cur.execute(update_sql, (album2_budget, 2, 2,))
else:
print("Insufficient budget to transfer")
# Commit the transaction.
conn.commit()
print("Transferred marketing budget from Album 2 to Album 1")
C#
using Npgsql;
using System.Data;
namespace dotnet_snippets;
public static class TagsSample
{
public static void Tags(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Start a transaction with isolation level Serializable.
// Spanner only supports this isolation level. Trying to use a lower
// isolation level (including the default isolation level READ COMMITTED),
// will result in an error.
var transaction = connection.BeginTransaction(IsolationLevel.Serializable);
// Create a command that uses the current transaction.
using var cmd = connection.CreateCommand();
cmd.Transaction = transaction;
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
cmd.CommandText = "set spanner.transaction_tag='example-tx-tag'";
cmd.ExecuteNonQuery();
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
cmd.CommandText = "set spanner.statement_tag='query-marketing-budget'";
cmd.ExecuteNonQuery();
// Get the marketing_budget of Album (1,1).
cmd.CommandText = "select marketing_budget from albums where singer_id=$1 and album_id=$2";
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
var marketingBudget = (long?)cmd.ExecuteScalar();
// Reduce the marketing budget by 10% if it is more than 1,000.
if (marketingBudget > 1000L)
{
marketingBudget -= (long) (marketingBudget * 0.1);
// Set the statement tag to use for the update statement.
cmd.Parameters.Clear();
cmd.CommandText = "set spanner.statement_tag='reduce-marketing-budget'";
cmd.ExecuteNonQuery();
cmd.CommandText = "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3";
cmd.Parameters.Add(new NpgsqlParameter { Value = marketingBudget });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.ExecuteNonQuery();
}
// Commit the current transaction.
transaction.Commit();
Console.WriteLine("Reduced marketing budget");
}
}
PHP
function update_data_with_transaction(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// Start a read/write transaction.
$connection->beginTransaction();
// Transfer marketing budget from one album to another.
// We do it in a transaction to ensure that the transfer is atomic.
// Create a prepared statement that we can use to execute the same
// SQL string multiple times with different parameter values.
$select_marketing_budget_statement = $connection->prepare(
"SELECT marketing_budget "
."from albums "
."WHERE singer_id = ? "
."and album_id = ?"
);
// Get the marketing budget of Album #2.
$select_marketing_budget_statement->execute([2, 2]);
$album2_budget = $select_marketing_budget_statement->fetchAll()[0][0];
$select_marketing_budget_statement->closeCursor();
$transfer = 200000;
if ($album2_budget > $transfer) {
// Get the marketing budget of Album #1.
$select_marketing_budget_statement->execute([1, 1]);
$album1_budget = $select_marketing_budget_statement->fetchAll()[0][0];
$select_marketing_budget_statement->closeCursor();
// Transfer the marketing budgets and write the update back
// to the database.
$album1_budget += $transfer;
$album2_budget -= $transfer;
// PHP PDO also supports named query parameters.
$update_statement = $connection->prepare(
"update albums "
."set marketing_budget = :budget "
."where singer_id = :singer_id "
."and album_id = :album_id"
);
// Start a DML batch. This batch will become part of the current transaction.
// $connection->exec("start batch dml");
// Update the marketing budget of both albums.
$update_statement->execute(["budget" => $album1_budget, "singer_id" => 1, "album_id" => 1]);
$update_statement->execute(["budget" => $album2_budget, "singer_id" => 2, "album_id" => 2]);
// $connection->exec("run batch");
} else {
print("Insufficient budget to transfer\n");
}
// Commit the transaction.
$connection->commit();
print("Transferred marketing budget from Album 2 to Album 1\n");
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./update_data_with_transaction.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar writewithtransactionusingdml example-db
Go
go run sample_runner.go writewithtransactionusingdml example-db
Node.js
npm start writewithtransactionusingdml example-db
Python
python update_data_with_transaction.py example-db
C#
dotnet run writewithtransactionusingdml example-db
PHP
php update_data_with_transaction.php example-db
畫面上會顯示下列訊息:
Transferred marketing budget from Album 2 to Album 1
交易標記和要求標記
使用交易標記和要求標記排解 Spanner 中的交易和查詢問題。您可以使用 SPANNER.TRANSACTION_TAG
和 SPANNER.STATEMENT_TAG
工作階段變數設定交易標記和要求標記。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql << SQL
-- Start a transaction.
begin;
-- Set the TRANSACTION_TAG session variable to set a transaction tag
-- for the current transaction. This can only be executed at the start
-- of the transaction.
set spanner.transaction_TAG='example-tx-tag';
-- Set the STATEMENT_TAG session variable to set the request tag
-- that should be included with the next SQL statement.
set spanner.statement_tag='query-marketing-budget';
select marketing_budget
from albums
where singer_id = 1
and album_id = 1;
-- Reduce the marketing budget by 10% if it is more than 1,000.
-- Set a statement tag for the update statement.
set spanner.statement_tag='reduce-marketing-budget';
update albums
set marketing_budget = marketing_budget - (marketing_budget * 0.1)::bigint
where singer_id = 1
and album_id = 1
and marketing_budget > 1000;
commit;
SQL
echo "Reduced marketing budget"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
class Tags {
static void tags(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Set AutoCommit=false to enable transactions.
connection.setAutoCommit(false);
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
connection.createStatement().execute("set spanner.transaction_tag='example-tx-tag'");
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
connection.createStatement().execute("set spanner.statement_tag='query-marketing-budget'");
long marketingBudget = 0L;
long singerId = 1L;
long albumId = 1L;
try (PreparedStatement statement =
connection.prepareStatement(
"select marketing_budget from albums where singer_id=? and album_id=?")) {
statement.setLong(1, singerId);
statement.setLong(2, albumId);
try (ResultSet albumResultSet = statement.executeQuery()) {
while (albumResultSet.next()) {
marketingBudget = albumResultSet.getLong(1);
}
}
}
// Reduce the marketing budget by 10% if it is more than 1,000.
final long maxMarketingBudget = 1000L;
final float reduction = 0.1f;
if (marketingBudget > maxMarketingBudget) {
marketingBudget -= (long) (marketingBudget * reduction);
connection.createStatement().execute("set spanner.statement_tag='reduce-marketing-budget'");
try (PreparedStatement statement =
connection.prepareStatement(
"update albums set marketing_budget=? where singer_id=? AND album_id=?")) {
int paramIndex = 0;
statement.setLong(++paramIndex, marketingBudget);
statement.setLong(++paramIndex, singerId);
statement.setLong(++paramIndex, albumId);
statement.executeUpdate();
}
}
// Commit the current transaction.
connection.commit();
System.out.println("Reduced marketing budget");
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func Tags(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
tx, err := conn.Begin(ctx)
if err != nil {
return err
}
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
_, _ = tx.Exec(ctx, "set spanner.transaction_tag='example-tx-tag'")
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
_, _ = tx.Exec(ctx, "set spanner.statement_tag='query-marketing-budget'")
row := tx.QueryRow(ctx, "select marketing_budget "+
"from albums "+
"where singer_id=$1 and album_id=$2", 1, 1)
var budget int64
if err := row.Scan(&budget); err != nil {
tx.Rollback(ctx)
return err
}
// Reduce the marketing budget by 10% if it is more than 1,000.
if budget > 1000 {
budget = int64(float64(budget) - float64(budget)*0.1)
_, _ = tx.Exec(ctx, "set spanner.statement_tag='reduce-marketing-budget'")
if _, err := tx.Exec(ctx, "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3", budget, 1, 1); err != nil {
tx.Rollback(ctx)
return err
}
}
// Commit the current transaction.
tx.Commit(ctx)
fmt.Println("Reduced marketing budget")
return nil
}
Node.js
import { Client } from 'pg';
async function tags(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
await connection.query("begin");
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
await connection.query("set spanner.transaction_tag='example-tx-tag'");
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
await connection.query("set spanner.statement_tag='query-marketing-budget'");
const budgetResult = await connection.query(
"select marketing_budget " +
"from albums " +
"where singer_id=$1 and album_id=$2", [1, 1])
let budget = budgetResult.rows[0]["marketing_budget"];
// Reduce the marketing budget by 10% if it is more than 1,000.
if (budget > 1000) {
budget = budget - budget * 0.1;
await connection.query("set spanner.statement_tag='reduce-marketing-budget'");
await connection.query("update albums set marketing_budget=$1 "
+ "where singer_id=$2 AND album_id=$3", [budget, 1, 1]);
}
// Commit the current transaction.
await connection.query("commit");
console.log("Reduced marketing budget");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def tags(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
# Set autocommit=False to enable transactions.
conn.autocommit = False
with conn.cursor() as cur:
# Set the TRANSACTION_TAG session variable to set a transaction tag
# for the current transaction.
cur.execute("set spanner.transaction_TAG='example-tx-tag'")
# Set the STATEMENT_TAG session variable to set the request tag
# that should be included with the next SQL statement.
cur.execute("set spanner.statement_tag='query-marketing-budget'")
singer_id = 1
album_id = 1
cur.execute("select marketing_budget "
"from albums "
"where singer_id = %s "
" and album_id = %s",
(singer_id, album_id,))
marketing_budget = cur.fetchone()[0]
# Reduce the marketing budget by 10% if it is more than 1,000.
max_marketing_budget = 1000
reduction = 0.1
if marketing_budget > max_marketing_budget:
# Make sure the marketing_budget remains an int.
marketing_budget -= int(marketing_budget * reduction)
# Set a statement tag for the update statement.
cur.execute(
"set spanner.statement_tag='reduce-marketing-budget'")
cur.execute("update albums set marketing_budget = %s "
"where singer_id = %s "
" and album_id = %s",
(marketing_budget, singer_id, album_id,))
else:
print("Marketing budget already less than or equal to 1,000")
# Commit the transaction.
conn.commit()
print("Reduced marketing budget")
C#
using Npgsql;
using System.Data;
namespace dotnet_snippets;
public static class TagsSample
{
public static void Tags(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Start a transaction with isolation level Serializable.
// Spanner only supports this isolation level. Trying to use a lower
// isolation level (including the default isolation level READ COMMITTED),
// will result in an error.
var transaction = connection.BeginTransaction(IsolationLevel.Serializable);
// Create a command that uses the current transaction.
using var cmd = connection.CreateCommand();
cmd.Transaction = transaction;
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
cmd.CommandText = "set spanner.transaction_tag='example-tx-tag'";
cmd.ExecuteNonQuery();
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
cmd.CommandText = "set spanner.statement_tag='query-marketing-budget'";
cmd.ExecuteNonQuery();
// Get the marketing_budget of Album (1,1).
cmd.CommandText = "select marketing_budget from albums where singer_id=$1 and album_id=$2";
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
var marketingBudget = (long?)cmd.ExecuteScalar();
// Reduce the marketing budget by 10% if it is more than 1,000.
if (marketingBudget > 1000L)
{
marketingBudget -= (long) (marketingBudget * 0.1);
// Set the statement tag to use for the update statement.
cmd.Parameters.Clear();
cmd.CommandText = "set spanner.statement_tag='reduce-marketing-budget'";
cmd.ExecuteNonQuery();
cmd.CommandText = "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3";
cmd.Parameters.Add(new NpgsqlParameter { Value = marketingBudget });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.ExecuteNonQuery();
}
// Commit the current transaction.
transaction.Commit();
Console.WriteLine("Reduced marketing budget");
}
}
PHP
function tags(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// Start a read/write transaction.
$connection->beginTransaction();
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
$connection->exec("set spanner.transaction_TAG='example-tx-tag'");
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
$connection->exec("set spanner.statement_tag='query-marketing-budget'");
$singer_id = 1;
$album_id = 1;
$statement = $connection->prepare(
"select marketing_budget "
."from albums "
."where singer_id = ? "
." and album_id = ?"
);
$statement->execute([1, 1]);
$marketing_budget = $statement->fetchAll()[0][0];
$statement->closeCursor();
# Reduce the marketing budget by 10% if it is more than 1,000.
$max_marketing_budget = 1000;
$reduction = 0.1;
if ($marketing_budget > $max_marketing_budget) {
// Make sure the marketing_budget remains an int.
$marketing_budget -= intval($marketing_budget * $reduction);
// Set a statement tag for the update statement.
$connection->exec("set spanner.statement_tag='reduce-marketing-budget'");
$update_statement = $connection->prepare(
"update albums set marketing_budget = :budget "
."where singer_id = :singer_id "
." and album_id = :album_id"
);
$update_statement->execute([
"budget" => $marketing_budget,
"singer_id" => $singer_id,
"album_id" => $album_id,
]);
} else {
print("Marketing budget already less than or equal to 1,000\n");
}
// Commit the transaction.
$connection->commit();
print("Reduced marketing budget\n");
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./tags.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar tags example-db
Go
go run sample_runner.go tags example-db
Node.js
npm start tags example-db
Python
python tags.py example-db
C#
dotnet run tags example-db
PHP
php tags.php example-db
使用唯讀交易擷取資料
假設您想要在相同時間戳記執行一次以上的讀取作業。唯讀交易會觀察出交易修訂記錄中一致的前置字串,讓應用程式取得的資料始終保持一致。將連線設為唯讀,或使用 SET TRANSACTION READ ONLY
SQL 陳述式執行唯讀交易。
以下顯示如何執行查詢,並在同一個唯讀交易中執行讀取作業:
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql << SQL
-- Begin a transaction.
begin;
-- Change the current transaction to a read-only transaction.
-- This statement can only be executed at the start of a transaction.
set transaction read only;
-- The following two queries use the same read-only transaction.
select singer_id, album_id, album_title
from albums
order by singer_id, album_id;
select singer_id, album_id, album_title
from albums
order by album_title;
-- Read-only transactions must also be committed or rolled back to mark
-- the end of the transaction. There is no semantic difference between
-- rolling back or committing a read-only transaction.
commit;
SQL
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class ReadOnlyTransaction {
static void readOnlyTransaction(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Set AutoCommit=false to enable transactions.
connection.setAutoCommit(false);
// This SQL statement instructs the JDBC driver to use
// a read-only transaction.
connection.createStatement().execute("set transaction read only");
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(
"SELECT singer_id, album_id, album_title "
+ "FROM albums "
+ "ORDER BY singer_id, album_id")) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n",
resultSet.getLong("singer_id"),
resultSet.getLong("album_id"),
resultSet.getString("album_title"));
}
}
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(
"SELECT singer_id, album_id, album_title "
+ "FROM albums "
+ "ORDER BY album_title")) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n",
resultSet.getLong("singer_id"),
resultSet.getLong("album_id"),
resultSet.getString("album_title"));
}
}
// End the read-only transaction by calling commit().
connection.commit();
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func ReadOnlyTransaction(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Start a read-only transaction by supplying additional transaction options.
tx, err := conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
albumsOrderedById, err := tx.Query(ctx, "SELECT singer_id, album_id, album_title FROM albums ORDER BY singer_id, album_id")
defer albumsOrderedById.Close()
if err != nil {
return err
}
for albumsOrderedById.Next() {
var singerId, albumId int64
var title string
err = albumsOrderedById.Scan(&singerId, &albumId, &title)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, albumId, title)
}
albumsOrderedTitle, err := tx.Query(ctx, "SELECT singer_id, album_id, album_title FROM albums ORDER BY album_title")
defer albumsOrderedTitle.Close()
if err != nil {
return err
}
for albumsOrderedTitle.Next() {
var singerId, albumId int64
var title string
err = albumsOrderedTitle.Scan(&singerId, &albumId, &title)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, albumId, title)
}
// End the read-only transaction by calling Commit().
return tx.Commit(ctx)
}
Node.js
import { Client } from 'pg';
async function readOnlyTransaction(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Start a transaction.
await connection.query("begin");
// This SQL statement instructs the PGAdapter to make it a read-only transaction.
await connection.query("set transaction read only");
const albumsOrderById = await connection.query(
"SELECT singer_id, album_id, album_title "
+ "FROM albums "
+ "ORDER BY singer_id, album_id");
for (const row of albumsOrderById.rows) {
console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
}
const albumsOrderByTitle = await connection.query(
"SELECT singer_id, album_id, album_title "
+ "FROM albums "
+ "ORDER BY album_title");
for (const row of albumsOrderByTitle.rows) {
console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
}
// End the read-only transaction by executing commit.
await connection.query("commit");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def read_only_transaction(host: string, port: int, database: string):
with (psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn):
# Set autocommit=False to enable transactions.
conn.autocommit = False
with conn.cursor() as cur:
# Change the current transaction to a read-only transaction.
# This statement can only be executed at the start of a transaction.
cur.execute("set transaction read only")
# The following two queries use the same read-only transaction.
cur.execute("select singer_id, album_id, album_title "
"from albums "
"order by singer_id, album_id")
for album in cur:
print(album)
cur.execute("select singer_id, album_id, album_title "
"from albums "
"order by album_title")
for album in cur:
print(album)
# Read-only transactions must also be committed or rolled back to mark
# the end of the transaction. There is no semantic difference between
# rolling back or committing a read-only transaction.
conn.commit()
C#
using Npgsql;
using System.Data;
namespace dotnet_snippets;
public static class ReadOnlyTransactionSample
{
public static void ReadOnlyTransaction(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Start a read-only transaction.
// You must specify Serializable as the isolation level, as the npgsql driver
// will otherwise automatically set the isolation level to read-committed.
var transaction = connection.BeginTransaction(IsolationLevel.Serializable);
using var cmd = connection.CreateCommand();
cmd.Transaction = transaction;
// This SQL statement instructs the npgsql driver to use
// a read-only transaction.
cmd.CommandText = "set transaction read only";
cmd.ExecuteNonQuery();
cmd.CommandText = "SELECT singer_id, album_id, album_title " +
"FROM albums " +
"ORDER BY singer_id, album_id";
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["album_title"]}");
}
}
cmd.CommandText = "SELECT singer_id, album_id, album_title "
+ "FROM albums "
+ "ORDER BY album_title";
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["album_title"]}");
}
}
// End the read-only transaction by calling commit().
transaction.Commit();
}
}
PHP
function read_only_transaction(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// Start a transaction.
$connection->beginTransaction();
// Change the current transaction to a read-only transaction.
// This statement can only be executed at the start of a transaction.
$connection->exec("set transaction read only");
// The following two queries use the same read-only transaction.
$statement = $connection->query(
"select singer_id, album_id, album_title "
."from albums "
."order by singer_id, album_id"
);
$rows = $statement->fetchAll();
foreach ($rows as $album)
{
printf("%s\t%s\t%s\n", $album["singer_id"], $album["album_id"], $album["album_title"]);
}
$statement = $connection->query(
"select singer_id, album_id, album_title "
."from albums "
."order by album_title"
);
$rows = $statement->fetchAll();
foreach ($rows as $album)
{
printf("%s\t%s\t%s\n", $album["singer_id"], $album["album_id"], $album["album_title"]);
}
# Read-only transactions must also be committed or rolled back to mark
# the end of the transaction. There is no semantic difference between
# rolling back or committing a read-only transaction.
$connection->commit();
$rows = null;
$statement = null;
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./read_only_transaction.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar readonlytransaction example-db
Go
go run sample_runner.go readonlytransaction example-db
Node.js
npm start readonlytransaction example-db
Python
python read_only_transaction.py example-db
C#
dotnet run readonlytransaction example-db
PHP
php read_only_transaction.php example-db
畫面會顯示類似以下的輸出:
1 1 Total Junk
1 2 Go, Go, Go
2 1 Green
2 2 Forever Hold Your Peace
2 3 Terrified
2 2 Forever Hold Your Peace
1 2 Go, Go, Go
2 1 Green
2 3 Terrified
1 1 Total Junk
分區查詢和 Data Boost
partitionQuery
API 會將查詢分成較小的片段 (稱為「分區」),並使用多部機器平行擷取分區。每個分區都會由分區權杖識別。PartitionQuery API 的延遲時間比標準查詢 API 長,因為它只適用於匯出或掃描整個資料庫等大量作業。
Data Boost 可讓您執行分析查詢和資料匯出作業,對已佈建 Spanner 執行個體的現有工作負載幾乎不會造成影響。Data Boost 僅支援分區查詢。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# 'set spanner.data_boost_enabled=true' enables Data Boost for
# all partitioned queries on this connection.
# 'run partitioned query' is a shortcut for partitioning the query
# that follows and executing each of the partitions that is returned
# by Spanner.
psql -c "set spanner.data_boost_enabled=true" \
-c "run partitioned query
select singer_id, first_name, last_name
from singers"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class DataBoost {
static void dataBoost(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// This enables Data Boost for all partitioned queries on this connection.
connection.createStatement().execute("set spanner.data_boost_enabled=true");
// Run a partitioned query. This query will use Data Boost.
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(
"run partitioned query "
+ "select singer_id, first_name, last_name "
+ "from singers")) {
while (resultSet.next()) {
System.out.printf(
"%d %s %s\n",
resultSet.getLong("singer_id"),
resultSet.getString("first_name"),
resultSet.getString("last_name"));
}
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func DataBoost(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// This enables Data Boost for all partitioned queries on this connection.
_, _ = conn.Exec(ctx, "set spanner.data_boost_enabled=true")
// Run a partitioned query. This query will use Data Boost.
rows, err := conn.Query(ctx, "run partitioned query select singer_id, first_name, last_name from singers")
defer rows.Close()
if err != nil {
return err
}
for rows.Next() {
var singerId int64
var firstName, lastName string
err = rows.Scan(&singerId, &firstName, &lastName)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, firstName, lastName)
}
return nil
}
Node.js
import { Client } from 'pg';
async function dataBoost(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// This enables Data Boost for all partitioned queries on this connection.
await connection.query("set spanner.data_boost_enabled=true");
// Run a partitioned query. This query will use Data Boost.
const singers = await connection.query(
"run partitioned query "
+ "select singer_id, first_name, last_name "
+ "from singers");
for (const row of singers.rows) {
console.log(`${row["singer_id"]} ${row["first_name"]} ${row["last_name"]}`);
}
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def data_boost(host: string, port: int, database: string):
with (psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn):
# Set autocommit=True so each query uses a separate transaction.
conn.autocommit = True
with conn.cursor() as cur:
# This enables Data Boost for all partitioned queries on this
# connection.
cur.execute("set spanner.data_boost_enabled=true")
# Run a partitioned query. This query will use Data Boost.
cur.execute("run partitioned query "
"select singer_id, first_name, last_name "
"from singers")
for singer in cur:
print(singer)
C#
using Npgsql;
namespace dotnet_snippets;
public static class DataBoostSample
{
public static void DataBoost(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = connection.CreateCommand();
// This enables Data Boost for all partitioned queries on this connection.
cmd.CommandText = "set spanner.data_boost_enabled=true";
cmd.ExecuteNonQuery();
// Run a partitioned query. This query will use Data Boost.
cmd.CommandText = "run partitioned query "
+ "select singer_id, first_name, last_name "
+ "from singers";
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["first_name"]} {reader["last_name"]}");
}
}
}
PHP
function data_boost(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// This enables Data Boost for all partitioned queries on this
// connection.
$connection->exec("set spanner.data_boost_enabled=true");
// Run a partitioned query. This query will use Data Boost.
$statement = $connection->query(
"run partitioned query "
."select singer_id, first_name, last_name "
."from singers"
);
$rows = $statement->fetchAll();
foreach ($rows as $singer) {
printf("%s\t%s\t%s\n", $singer["singer_id"], $singer["first_name"], $singer["last_name"]);
}
$rows = null;
$statement = null;
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./data_boost.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar databoost example-db
Go
go run sample_runner.go databoost example-db
Node.js
npm start databoost example-db
Python
python data_boost.py example-db
C#
dotnet run databoost example-db
PHP
php data_boost.php example-db
如要進一步瞭解如何執行分區查詢,以及如何搭配 PGAdapter 使用 Data Boost,請參閱:Data Boost 和分區查詢陳述式
分區 DML
分區資料操縱語言 (DML) 是專為下列類型的大量更新和刪除作業而設計:
- 定期清理和垃圾收集。
- 對具有預設值的新資料欄進行補充作業。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Change the DML mode that is used by this connection to Partitioned
# DML. Partitioned DML is designed for bulk updates and deletes.
# See https://cloud.google.com/spanner/docs/dml-partitioned for more
# information.
psql -c "set spanner.autocommit_dml_mode='partitioned_non_atomic'" \
-c "update albums
set marketing_budget=0
where marketing_budget is null"
echo "Updated albums using Partitioned DML"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
class PartitionedDml {
static void partitionedDml(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Enable Partitioned DML on this connection.
connection
.createStatement()
.execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Back-fill a default value for the MarketingBudget column.
long lowerBoundUpdateCount =
connection
.createStatement()
.executeUpdate("update albums set marketing_budget=0 where marketing_budget is null");
System.out.printf("Updated at least %d albums\n", lowerBoundUpdateCount);
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func PartitionedDML(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Enable Partitioned DML on this connection.
if _, err := conn.Exec(ctx, "set spanner.autocommit_dml_mode='partitioned_non_atomic'"); err != nil {
return err
}
// Back-fill a default value for the MarketingBudget column.
tag, err := conn.Exec(ctx, "update albums set marketing_budget=0 where marketing_budget is null")
if err != nil {
return err
}
fmt.Printf("Updated at least %v albums\n", tag.RowsAffected())
return nil
}
Node.js
import { Client } from 'pg';
async function partitionedDml(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Enable Partitioned DML on this connection.
await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Back-fill a default value for the MarketingBudget column.
const lowerBoundUpdateCount = await connection.query(
"update albums " +
"set marketing_budget=0 " +
"where marketing_budget is null");
console.log(`Updated at least ${lowerBoundUpdateCount.rowCount} albums`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def execute_partitioned_dml(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
# Change the DML mode that is used by this connection to Partitioned
# DML. Partitioned DML is designed for bulk updates and deletes.
# See https://cloud.google.com/spanner/docs/dml-partitioned for more
# information.
cur.execute(
"set spanner.autocommit_dml_mode='partitioned_non_atomic'")
# The following statement will use Partitioned DML.
cur.execute("update albums "
"set marketing_budget=0 "
"where marketing_budget is null")
print("Updated at least %d albums" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class PartitionedDmlSample
{
public static void PartitionedDml(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Enable Partitioned DML on this connection.
using var cmd = connection.CreateCommand();
cmd.CommandText = "set spanner.autocommit_dml_mode='partitioned_non_atomic'";
cmd.ExecuteNonQuery();
// Back-fill a default value for the MarketingBudget column.
cmd.CommandText = "update albums set marketing_budget=0 where marketing_budget is null";
var lowerBoundUpdateCount = cmd.ExecuteNonQuery();
Console.WriteLine($"Updated at least {lowerBoundUpdateCount} albums");
}
}
PHP
function execute_partitioned_dml(string $host, string $port, string $database): void
{
$dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
$connection = new PDO($dsn);
// Change the DML mode that is used by this connection to Partitioned
// DML. Partitioned DML is designed for bulk updates and deletes.
// See https://cloud.google.com/spanner/docs/dml-partitioned for more
// information.
$connection->exec("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// The following statement will use Partitioned DML.
$rowcount = $connection->exec(
"update albums "
."set marketing_budget=0 "
."where marketing_budget is null"
);
printf("Updated at least %d albums\n", $rowcount);
$statement = null;
$connection = null;
}
使用下列指令執行範例:
psql
PGDATABASE=example-db ./partitioned_dml.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar partitioneddml example-db
Go
go run sample_runner.go partitioneddml example-db
Node.js
npm start partitioneddml example-db
Python
python partitioned_dml.py example-db
C#
dotnet run datpartitioneddmlboost example-db
PHP
php partitioned_dml.php example-db
清除所用資源
如要避免系統向您的 Cloud Billing 帳戶收取您在本教學課程中所用資源的額外費用,請捨棄資料庫並刪除您建立的執行個體。
刪除資料庫
您刪除執行個體時,也會自動刪除其中所有資料庫。這個步驟將示範如何在保留執行個體的情況下刪除資料庫 (您仍須支付執行個體費用)。
使用指令列
gcloud spanner databases delete example-db --instance=test-instance
使用 Google Cloud 主控台
前往 Google Cloud 控制台的「Spanner 執行個體」頁面。
點選執行個體。
點選您要刪除的資料庫。
在「Database details」(資料庫詳細資料) 頁面,按一下 [Delete] (刪除)。
確認您要刪除資料庫,然後按一下 [Delete] (刪除)。
刪除執行個體
您刪除執行個體時,也會自動捨棄您在其中建立的所有資料庫。
使用指令列
gcloud spanner instances delete test-instance
使用 Google Cloud 主控台
前往 Google Cloud 控制台的「Spanner 執行個體」頁面。
點選執行個體。
按一下 [Delete] (刪除)。
確認您要刪除執行個體,然後按一下 [Delete] (刪除)。
後續步驟
瞭解如何透過虛擬機器執行個體存取 Spanner。
請參閱「使用用戶端程式庫驗證 Cloud 服務」一文,進一步瞭解授權和驗證憑證。
進一步瞭解 Spanner 的結構定義設計最佳做法。