管理串流

本頁面將說明如何使用 Datastream API 執行下列操作:

  • 建立串流
  • 取得串流和串流物件相關資訊
  • 啟動、暫停、繼續及修改串流,以及啟動及停止串流物件的補充作業,藉此更新串流
  • 復原永久失敗的串流
  • 為 Oracle 串流啟用大型物件的串流功能
  • 刪除串流

您可以透過兩種方式使用 Datastream API。您可以發出 REST API 呼叫,也可以使用 Google Cloud CLI (CLI)。

如要概略瞭解如何使用 gcloud 管理 Datastream 串流,請參閱「gcloud Datastream 串流」。

建立串流

本節將說明如何建立用於將資料從來源傳輸至目的地的串流。以下範例並非完整說明,而是著重於 Datastream 的特定功能。如要解決特定用途,請搭配使用這些範例和 Datastream API 參考資料說明文件

本節將說明下列用途:

範例 1:將特定物件串流至 BigQuery

本範例將說明如何:

  • 將資料從 MySQL 串流至 BigQuery
  • 在串流中納入一組物件
  • 將串流的寫入模式定義為僅供附加
  • 補充串流中包含的所有物件

以下是要求,用於從 schema1 提取所有資料表,以及從 schema2 提取兩個特定資料表:tableAtableC。事件會寫入 BigQuery 中的資料集。

要求不包含 customerManagedEncryptionKey 參數,因此會使用 Google Cloud 內部金鑰管理系統加密資料,而非使用 CMEK。

與執行歷史補充作業 (或快照) 相關聯的 backfillAll 參數會設為空字典 ({}),這表示 Datastream 會從串流中包含的所有資料表中補充歷史資料。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

如要進一步瞭解如何使用 gcloud 建立串流,請參閱 Google Cloud SDK 說明文件

範例 2:從含有 PostgreSQL 來源的串流中排除特定物件

本範例將說明如何:

  • 將資料從 PostgreSQL 串流至 BigQuery
  • 從串流中排除物件
  • 從補充作業中排除物件

以下程式碼顯示建立串流的要求,該串流用於將資料從來源 PostgreSQL 資料庫傳輸至 BigQuery。從來源 PostgreSQL 資料庫建立串流時,您需要在要求中指定兩個額外的 PostgreSQL 專屬欄位:

  • replicationSlot:複製運算單元是設定 PostgreSQL 資料庫以進行複製作業的必要條件。您必須為每個串流建立複製時段。
  • publication:發布作業是一組您要複製變更的資料表。您必須先在資料庫中建立發布作業名稱,才能開始串流。發布作業至少必須包含串流 includeObjects 清單中指定的資料表。

與執行歷史回填 (或快照) 相關聯的 backfillAll 參數已設為排除一個資料表。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

如要進一步瞭解如何使用 gcloud 建立串流,請參閱 Google Cloud SDK 說明文件

範例 3:為串流指定僅附加寫入模式

將資料串流至 BigQuery 時,您可以定義寫入模式:mergeappendOnly。詳情請參閱「設定寫入模式」。

如果您未在建立串流的要求中指定寫入模式,系統會使用預設的 merge 模式。

下列要求說明如何在建立 MySQL 到 BigQuery 串流時,定義 appendOnly 模式。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

如要進一步瞭解如何使用 gcloud 建立串流,請參閱 Google Cloud SDK 說明文件

示例 4:串流至 Cloud Storage 目的地

本範例將說明如何:

  • 從 Oracle 串流至 Cloud Storage
  • 定義要納入串流的物件組合
  • 定義用於加密靜態資料的 CMEK

以下要求示範如何建立會將事件寫入 Cloud Storage 值區的串流。

在這個範例要求中,事件會以 JSON 輸出格式寫入,且每 100 MB 或 30 秒就會建立新的檔案 (覆寫預設值 50 MB 和 60 秒)。

針對 JSON 格式,您可以:

  • 在路徑中加入一致類型結構定義檔案。因此,Datastream 會將兩個檔案寫入 Cloud Storage:JSON 資料檔案和 Avro 結構定義檔案。結構定義檔案的名稱與資料檔案相同,但副檔名為 .schema

  • 啟用 gzip 壓縮功能,讓 Datastream 壓縮寫入 Cloud Storage 的檔案。

透過使用 backfillNone 參數,要求會指定只將目前的變更串流至目的地,而不會填補資料。

這項要求會指定客戶代管的加密金鑰參數,讓您控管用於加密 Google Cloud 專案中靜態資料的金鑰。這個參數是指 Datastream 用來加密從來源串流至目標的資料的 CMEK。並指定 CMEK 的金鑰環。

如要進一步瞭解鑰匙圈,請參閱「Cloud KMS 資源」。如要進一步瞭解如何使用加密金鑰保護資料,請參閱「Cloud Key Management Service (KMS)」。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

如要進一步瞭解如何使用 gcloud 建立串流,請參閱 Google Cloud SDK 說明文件

示例 5:串流至 BigLake 代管資料表

在本例中,您將瞭解如何設定串流,以便在 append-only 模式下,將資料從 MySQL 資料庫複製到 BigLake 管理式資料表 (BLMT)。建立要求前,請確認您已完成下列步驟:

  • 擁有要用來儲存資料的 Cloud Storage 值區
  • 建立 Cloud 資源連線
  • 授予 Cloud 資源連線存取 Cloud Storage 值區

接著,您可以使用下列要求建立串流:

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBlmtStream
{
  "displayName": "MySQL to BLMT stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBlmtCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          {
            "database": "my-mysql-database"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id",
    "bigqueryDestinationConfig": {
      "blmtConfig": {
        "bucket": "my-gcs-bucket-name",
        "rootPath": "my/folder",
        "connectionName": "my-project-id.us-central1.my-bigquery-connection-name",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG"
        },
      "singleTargetDataset": {
        "datasetId": "my-project-id:my-bigquery-dataset-id"
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

datastream streams create mysqlBlmtStream --location=us-central1
--display-name=mysql-to-blmt-stream --source=source --mysql-source-config=mysql_source_config.json
--destination=destination --bigquery-destination-config=blmt_config.json
--backfill-none

mysql_source_config.json 來源設定檔的內容:

{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{"database":"my-mysql-database"}]}}

blmt_config.json 設定檔的內容:

{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }

Terraform

resource "google_datastream_stream" "stream" {
  stream_id    = "mysqlBlmtStream"
  location     = "us-central1"
  display_name = "MySQL to BLMT stream"

  source_config {
    source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlmtCp"
    mysql_source_config {
      include_objects {
        mysql_databases {
          database = "my-mysql-database"
        }
      }
    }
  }

  destination_config {
    destination_connection_profile = "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id"
    bigquery_destination_config {
      single_target_dataset {
        dataset_id = "my-project-id:my-bigquery-dataset-id"
      }
      blmt_config {
        bucket          = "my-gcs-bucket-name"
        table_format    = "ICEBERG"
        file_format     = "PARQUET"
        connection_name = "my-project-id.us-central1.my-bigquery-connection-name"
        root_path       = "my/folder"
      }
      append_only {}
    }
  }

  backfill_none {}
}
    

驗證串流定義

建立串流之前,您可以驗證其定義。這樣一來,您就能確保所有驗證檢查都通過,並且串流在建立時會順利執行。

驗證串流會檢查:

  • 來源是否已正確設定,讓 Datastream 能夠從中串流資料。
  • 串流是否可以連線至來源和目的地。
  • 串流的端對端設定。

如要驗證串流,請在要求主體前方的網址中加入 &validate_only=true

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

提出這項要求後,您會看到 Datastream 針對來源和目的地執行的驗證檢查,以及檢查是否通過或失敗。如果驗證檢查未通過,系統會顯示失敗原因和修正問題的方法。

舉例來說,假設您有一個客戶自行管理的加密金鑰 (CMEK),希望 Datastream 用來加密從來源串流至目的地的資料。在驗證串流時,Datastream 會驗證金鑰是否存在,以及 Datastream 是否具備使用金鑰的權限。如果不符合上述任一條件,驗證串流時就會傳回以下錯誤訊息:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

如要解決這個問題,請確認您提供的金鑰確實存在,且 Datastream 服務帳戶具備該組金鑰的 cloudkms.cryptoKeys.get 權限。

修正後,請再次提出要求,確認所有驗證檢查都通過。以上述範例來說,CMEK_VALIDATE_PERMISSIONS 檢查將不再傳回錯誤訊息,但狀態會是 PASSED

取得串流的相關資訊

下列程式碼顯示要求,用於擷取串流的相關資訊。這類資訊包括:

  • 串流名稱 (專屬 ID)
  • 串流的使用者友善名稱 (顯示名稱)
  • 建立串流和上次更新時間的時間戳記
  • 與串流相關聯的來源和目的地連線設定檔資訊
  • 串流的狀態

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

回應如下所示:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

如要進一步瞭解如何使用 gcloud 擷取串流資訊,請按這裡

可列出串流

以下程式碼顯示要求,用於擷取指定專案和位置中的所有串流清單。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

如要進一步瞭解如何使用 gcloud 擷取所有串流的相關資訊,請按這裡

列出串流的物件

以下程式碼顯示要求,用於擷取串流的所有物件相關資訊。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

如要進一步瞭解如何使用 gcloud 擷取串流中所有物件的相關資訊,請按這裡

傳回的物件清單可能如下所示:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

如要進一步瞭解如何使用 gcloud 列出串流的物件,請按這裡

啟動串流

下列程式碼顯示啟動串流的要求。

在要求中使用 updateMask 參數,即可只在要求主體中加入您指定的欄位。如要啟動串流,請將 state 欄位中的值從 CREATED 變更為 RUNNING

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

如要進一步瞭解如何使用 gcloud 開始串流,請按這裡

暫停串流

以下程式碼顯示暫停執行中的串流要求。

在本例中,updateMask 參數指定的欄位為 state 欄位。暫停串流會將其狀態從 RUNNING 變更為 PAUSED

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

如要進一步瞭解如何使用 gcloud 暫停串流,請按這裡

繼續執行串流

以下程式碼顯示要求繼續暫停的串流。

在本例中,updateMask 參數指定的欄位為 state 欄位。恢復串流後,串流狀態就會從 PAUSED 變更為 RUNNING

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

如要進一步瞭解如何使用 gcloud 繼續串流,請按這裡

復原串流

您可以使用 RunStream 方法復原永久失敗的串流。每個來源資料庫類型都會定義可用的串流復原作業。詳情請參閱「復原串流」。

復原 MySQL 或 Oracle 來源的資料流

以下程式碼範例顯示從不同記錄檔位置復原 MySQL 或 Oracle 來源串流的要求:

REST

從目前位置恢復串流。這是預設選項:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

從下一個可用位置恢復串流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

從最新位置恢復串流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

從特定位置復原串流 (以 MySQL 二進位記錄為基礎的複製作業):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

更改下列內容:

  • NAME_OF_THE_LOG_FILE:您要從中復原串流的記錄檔案名稱
  • POSITION:您要從記錄檔案中哪個位置開始復原串流。如果您未提供值,Datastream 會從檔案的標頭復原串流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

從特定位置復原串流 (MySQL 以 GTID 為基礎的複製模式):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

GTID_SET 替換為一或多個單一 GTID 或 GTID 範圍,以便從中復原串流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3"
      }
    }
  }
}

從特定位置恢復串流 (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
請將 scn 替換為重做記錄檔案中的系統變更編號 (SCN),以便從該檔案復原串流。這是必填欄位。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

如要進一步瞭解可用的復原選項,請參閱「復原串流」。

gcloud

系統不支援使用 gcloud 復原串流。

復原 PostgreSQL 來源的串流

下列程式碼範例顯示要求復原 PostgreSQL 來源的串流。在復原期間,串流會從為串流設定的複製運算單元中的第一組記錄檔序號 (LSN) 開始讀取內容。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

如要變更複製運算單元,請先使用新的複製運算單元名稱更新串流:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

系統不支援使用 gcloud 復原串流。

復原 SQL Server 來源的串流

下列程式碼範例顯示針對 SQL Server 來源復原串流的示例要求。

REST

從第一個可用位置復原串流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

從偏好的記錄序號恢復串流:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

系統不支援使用 gcloud 復原串流。

從特定位置開始或繼續播放串流

您可以從 MySQL 和 Oracle 來源的特定位置開始串流,或繼續串流已暫停的串流。當您想使用外部工具執行回填作業,或從您指定的位置開始 CDC 時,這項功能就很實用。針對 MySQL 來源,您需要在重做記錄檔中指出 binlog 位置或 GTID 集合;針對 Oracle 來源,則需要指出系統變更編號 (SCN)。

以下程式碼顯示從特定位置開始或繼續播放已建立的串流的要求。

從特定二進位記錄檔位置開始或繼續串流 (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

更改下列內容:

  • NAME_OF_THE_LOG_FILE:您要從中開始串流的記錄檔案名稱。
  • POSITION:您要在記錄檔案中從哪個位置開始串流。如果您未提供值,Datastream 會從檔案的開頭開始讀取。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

系統不支援使用 gcloud 從特定位置開始或繼續串流。如要瞭解如何使用 gcloud 啟動或繼續串流,請參閱 Cloud SDK 說明文件

從特定 GTID 集合開始或繼續串流 (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

GTID_SET 替換為一或多個單一 GTID 或 GTID 範圍,以便從中開始或繼續串流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7"
      }
    }
  }
}

gcloud

系統不支援使用 gcloud 從特定位置開始或繼續串流。如要瞭解如何使用 gcloud 啟動或繼續串流,請參閱 Cloud SDK 說明文件

從重做記錄檔 (Oracle) 中的特定系統變更編號開始或繼續串流:

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn 替換為重做記錄檔中要從中開始串流的系統變更編號 (SCN)。這是必填欄位。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

系統不支援使用 gcloud 從特定位置開始或繼續串流。如要瞭解如何使用 gcloud 啟動串流,請參閱 Cloud SDK 說明文件

修改串流

以下程式碼顯示要求更新串流的檔案輪替設定,以便每隔 75 MB 或 45 秒輪替檔案。

在本例中,updateMask 參數指定的欄位包括 fileRotationMbfileRotationInterval 欄位,分別由 destinationConfig.gcsDestinationConfig.fileRotationMbdestinationConfig.gcsDestinationConfig.fileRotationInterval 旗標代表。

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

下列程式碼顯示要求,要求在 Datastream 寫入 Cloud Storage 的檔案路徑中加入一致類型結構定義檔案。因此,Datastream 會寫入兩個檔案:JSON 資料檔案和 Avro 結構定義檔案。

在本例中,指定的欄位是 jsonFileFormat 欄位,由 destinationConfig.gcsDestinationConfig.jsonFileFormat 標記表示。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

下列程式碼顯示要求 Datastream 從來源資料庫複製現有資料,以及資料的持續變更至目的地。

程式碼的 oracleExcludedObjects 部分會顯示那些表格和結構定義,這些項目無法回填至目的地。

在本例中,除了結構定義 3 中的資料表 A 外,所有資料表和結構定義都會進行補充。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

如要進一步瞭解如何使用 gcloud 修改串流,請按這裡

啟動串流物件的補充作業

Datastream 中的串流可以補充歷來資料,並將持續變更的資料串流至目的地。進行中的變更一律會從來源串流至目的地。不過,您可以指定是否要串流歷來資料。

如果您想從來源串流歷來資料至目的地,請使用 backfillAll 參數。

Datastream 也能讓您只串流特定資料庫資料表的歷來資料。如要這麼做,請使用 backfillAll 參數,並排除不想使用歷來資料的資料表。

如果您只想將持續變更的內容串流至目的地,請使用 backfillNone 參數。接著,如果您想讓 Datastream 從來源串流所有現有資料的快照至目的地,就必須針對含有這類資料的物件手動啟動補充作業。

另一個啟動物件補充作業的原因,是當來源和目的地之間的資料不同步時。舉例來說,使用者可能會不小心刪除目的地中的資料,導致資料遺失。在這種情況下,啟動物件的補充作業會做為「重設機制」,因為所有資料都會一次串流至目的地。因此,資料會在來源和目的地之間同步。

您必須先擷取物件相關資訊,才能啟動串流物件的補充作業。

每個物件都有 OBJECT_ID,可用於唯一識別物件。您可以使用 OBJECT_ID 啟動串流的回填作業。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

如要進一步瞭解如何使用 gcloud 為串流的物件啟動回填作業,請參閱 Google Cloud SDK 說明文件

停止串流物件的補充作業

啟動串流物件補充作業後,您可以停止物件的補充作業。舉例來說,如果使用者修改資料庫結構定義,結構定義或資料可能會損毀。您不希望這項結構定義或資料串流至目的地,因此請停止為物件進行回填。

您也可以為物件停止補充作業,以便負載平衡。Datastream 可以並行執行多個回填作業。這可能增加來源的負載。如果負載量太大,請停止為每個物件進行補充,然後逐一啟動物件的補充作業。

您必須先提出要求,擷取串流中所有物件的相關資訊,才能停止串流物件的補充作業。每個傳回的物件都會附帶 OBJECT_ID,可用於唯一識別物件。您可以使用 OBJECT_ID 停止串流的回填作業。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

如要進一步瞭解如何使用 gcloud 停止串流物件的回填作業,請按這裡

變更 CDC 並行工作的數量上限

以下程式碼說明如何將 MySQL 串流的並行變更資料擷取 (CDC) 工作上限數量設為 7。

在本例中,updateMask 參數指定的欄位為 maxConcurrentCdcTasks 欄位。將其值設為 7 時,您會將並行 CDC 任務的數量從先前的值變更為 7。您可以使用 0 到 50 的值 (包括 0 與 50)。如果未定義值,或將其定義為 0,系統會為串流設定 5 個工作項的預設值。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

如要進一步瞭解如何使用 gcloud,請按這裡

變更並行補充工作數量上限

以下程式碼說明如何將 MySQL 串流的並行回填作業數量上限設為 25。

在本例中,updateMask 參數指定的欄位為 maxConcurrentBackfillTasks 欄位。將這個值設為 25 時,您會將並行回填工作的上限從先前的值變更為 25。您可以使用 0 到 50 的值 (包括 0 與 50)。如果未定義值,或將值定義為 0,系統會為串流設定 16 個工作項的預設值。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

如要進一步瞭解如何使用 gcloud,請按這裡

為 Oracle 來源啟用大型物件的串流功能

您可以為含有 Oracle 來源的串流啟用大型物件串流功能,例如二進位大型物件 (BLOB)、字元大型物件 (CLOB) 和國家/地區字元大型物件 (NCLOB)。streamLargeObjects 旗標可讓您在新的和現有的串流中加入大型物件。旗標會在串流層級設定,因此您不需要指定大型物件資料類型的資料欄。

以下範例說明如何建立可串流傳輸大型物件的串流。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

如要進一步瞭解如何使用 gcloud 更新串流,請參閱 Google Cloud SDK 說明文件

刪除串流

以下程式碼顯示刪除串流的要求。

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

如要進一步瞭解如何使用 gcloud 刪除串流,請按這裡

後續步驟