Datastream to MySQL or PostgreSQL (Stream) 範本

「Datastream to SQL」範本是串流管道,可讀取 Datastream 資料,並將資料複製到任何 MySQL 或 PostgreSQL 資料庫。範本會使用 Pub/Sub 通知從 Cloud Storage 讀取資料,並將這些資料複製到 SQL 副本資料表。指定 gcsPubSubSubscription 參數,從 Pub/Sub 通知讀取資料,或提供 inputFilePattern 參數,直接從 Cloud Storage 中的檔案讀取資料。

範本不支援資料定義語言 (DDL),且預期所有資料表都已存在於資料庫中。 複製作業會使用 Dataflow 有狀態轉換來篩選過時資料,並確保無序資料的一致性。舉例來說,如果資料列的較新版本已通過,系統就會忽略該資料列的延遲版本。執行的資料操縱語言 (DML) 會盡可能完美地將來源資料複製到目標資料。執行的 DML 陳述式遵循下列規則:

  • 如果存在主鍵,插入和更新作業會使用 upsert 語法 (即 INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE)。
  • 如果存在主鍵,刪除作業會複製為刪除 DML。
  • 如果沒有主鍵,插入和更新作業都會插入資料表。
  • 如果沒有主鍵,系統會忽略刪除作業。

如果您使用 Oracle 轉移至 Postgres 的公用程式,請在 SQL 中新增 ROWID 做為主鍵 (如果沒有主鍵)。

管道相關規定

  • 準備好或已複製資料的 Datastream 串流。
  • 為 Datastream 資料啟用 Cloud Storage Pub/Sub 通知
  • PostgreSQL 資料庫已填入必要結構定義。
  • 設定 Dataflow 工作站與 PostgreSQL 之間的網路存取權。

範本參數

必要參數

  • inputFilePattern:Cloud Storage 中要複製的 Datastream 檔案位置。這個檔案位置通常是串流的根路徑。
  • databaseHost:要連線的 SQL 主機。
  • databaseUser:具備所有必要權限的 SQL 使用者,可寫入所有複寫資料表。
  • databasePassword:SQL 使用者的密碼。

選用參數

  • gcsPubSubSubscription:具有 Datastream 檔案通知的 Pub/Sub 訂閱項目。例如:projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>
  • inputFileFormat:Datastream 產生的輸出檔案格式。例如 avrojson。預設值為 avro
  • streamName:要輪詢結構定義資訊的串流名稱或範本。預設值為 {_metadata_stream}
  • rfcStartDateTime:用於從 Cloud Storage 擷取的開始日期時間 (https://tools.ietf.org/html/rfc3339)。預設值為 1970-01-01T00:00:00.00Z。
  • dataStreamRootUrl:Datastream API 根網址。預設值為:https://datastream.googleapis.com/
  • databaseType:要寫入的資料庫類型 (例如 Postgres)。預設值為 postgres。
  • databasePort:要連線的 SQL 資料庫連接埠。預設值為 5432
  • databaseName:要連線的 SQL 資料庫名稱。預設值為 postgres
  • schemaMap:用於指定結構定義名稱變更的鍵/值對應 (即 old_name:new_name、CaseError:case_error)。預設為空白。
  • customConnectionString:選用連線字串,將取代預設資料庫字串。
  • numThreads:決定「格式至 DML」步驟的鍵值平行處理,具體來說,這個值會傳遞至 Reshuffle.withNumBuckets。預設值為 100。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Cloud Datastream to SQL template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH:Datastream 資料的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要從中讀取變更檔案的 Pub/Sub 訂閱項目。例如:projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST:您的 SQL 主機 IP。
  • DATABASE_USER:您的 SQL 使用者。
  • DATABASE_PASSWORD:您的 SQL 密碼。

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH:Datastream 資料的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要從中讀取變更檔案的 Pub/Sub 訂閱項目。例如:projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST:您的 SQL 主機 IP。
  • DATABASE_USER:您的 SQL 使用者。
  • DATABASE_PASSWORD:您的 SQL 密碼。

後續步驟