PostgreSQL to BigQuery テンプレート

PostgreSQL to BigQuery テンプレートは、PostgreSQL テーブルから既存の BigQuery テーブルにデータをコピーするバッチ パイプラインです。このパイプラインは JDBC を使用して PostgreSQL に接続します。保護をさらに強化するために、Cloud KMS 鍵で暗号化された Base64 でエンコードされたユーザー名、パスワード、接続文字列パラメータを渡すこともできます。ユーザー名、パスワード、接続文字列パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。

パイプラインの要件

  • パイプラインを実行する前に、BigQuery テーブルが存在する必要があります。
  • BigQuery テーブルに互換性のあるスキーマが必要です。
  • リレーショナル データベースは、Dataflow が実行されているサブネットからアクセス可能である必要があります。

テンプレートのパラメータ

必須パラメータ

  • driverJars: ドライバ JAR ファイルのカンマ区切りのリスト(例: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar)。
  • driverClassName: JDBC ドライバのクラス名(例: com.mysql.jdbc.Driver)。
  • connectionURL: JDBC 接続 URL の文字列たとえば、jdbc:mysql://some-host:3306/sampledb のようにします。この値は、Cloud KMS 鍵で暗号化され、Base64 でエンコードされた文字列として渡すことができます。Base64 でエンコードされた文字列から空白文字を削除します。Oracle の RAC 以外のデータベース接続文字列(jdbc:oracle:thin:@some-host:<port>:<sid>)と Oracle RAC データベース接続文字列(jdbc:oracle:thin:@//some-host[:<port>]/<service_name>)の違いに注意してください(例: jdbc:mysql://some-host:3306/sampledb)。
  • outputTable: BigQuery 出力テーブルの場所(例: <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>)。
  • bigQueryLoadingTemporaryDirectory: BigQuery 読み込みプロセスの一時ディレクトリ(例: gs://your-bucket/your-files/temp_dir)。

オプション パラメータ

  • connectionProperties: JDBC 接続に使用するプロパティ文字列。文字列の形式は [propertyName=property;]* にする必要があります。詳細については、MySQL ドキュメントの構成プロパティ(https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html)をご覧ください(例: unicode=true;characterEncoding=UTF-8)。
  • username: JDBC 接続に使用するユーザー名。Cloud KMS 鍵で暗号化された文字列として渡すことができます。また、projects/{project}/secrets/{secret}/versions/{secret_version} の形式の Secret Manager シークレットとして渡すこともできます。
  • password: JDBC 接続に使用するパスワード。Cloud KMS 鍵で暗号化された文字列として渡すことができます。また、projects/{project}/secrets/{secret}/versions/{secret_version} の形式の Secret Manager シークレットとして渡すこともできます。
  • query: ソースで実行されるクエリでデータを抽出します。一部の JDBC SQL 型と BigQuery 型は同じ名前を共有していますが、いくつかの違いがあります。重要な SQL と BigQuery のデータ型のマッピングには、DATETIME --> TIMESTAMP などがあります。

スキーマが一致しない場合は、型キャストが必要になることがありますこのパラメータは、クエリを読み込む Cloud Storage 内のファイルを指す gs:// パスに設定できます。ファイルのエンコードは UTF-8 にする必要があります。(例: select * from sampledb.sample_table)。

  • KMSEncryptionKey: ユーザー名、パスワード、接続文字列の復号に使用する Cloud KMS 暗号鍵。Cloud KMS 鍵を渡す場合は、ユーザー名、パスワード、接続文字列も暗号化する必要があります(例: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key)。
  • useColumnAlias: true に設定すると、パイプラインは列名の代わりに列エイリアス(AS)を使用して、行を BigQuery にマッピングします。デフォルトは false です。
  • isTruncate: true に設定すると、パイプラインは BigQuery にデータを読み込む前にデータを切り捨てます。デフォルトは false で、パイプラインはデータを追加します。
  • partitionColumn: このパラメータに、オプション パラメータとして定義された table の名前が指定されている場合、JdbcIO は範囲を使用して同じテーブル(サブクエリ)に対して複数のクエリ インスタンスを実行し、テーブルを並列で読み取ります。現在、Long パーティション列のみをサポートしています。
  • table: パーティションの使用時に読み取るテーブル。このパラメータは、かっこ内のサブクエリも受け入れます(例: (select id, name from Person) as subq)。
  • numPartitions: パーティションの数。下限と上限により、この値は、パーティション列を均等に分割するために使用される生成された WHERE 句式のパーティション ストライドを形成します。入力が 1 より小さい場合、数値は 1 に設定されます。
  • lowerBound: パーティション スキームで使用する下限。指定しない場合、サポートされているタイプのこの値は Apache Beam によって自動的に推測されます。
  • upperBound: パーティション スキームで使用する上限。指定しない場合、サポートされているタイプのこの値は Apache Beam によって自動的に推測されます。
  • fetchSize: データベースから一度に取得される行数。パーティション分割される読み取りには使用されません。デフォルトは 50,000 です。
  • createDisposition: 使用する BigQuery CreateDisposition。たとえば、CREATE_IF_NEEDEDCREATE_NEVER です。デフォルトは CREATE_NEVER です。
  • bigQuerySchemaPath: BigQuery JSON スキーマの Cloud Storage パス。createDisposition が CREATE_IF_NEEDED に設定されている場合は、このパラメータを指定する必要があります(例: gs://your-bucket/your-schema.json)。
  • disabledAlgorithms: 無効にするためのカンマ区切りのアルゴリズム。この値が none に設定されている場合、アルゴリズムは無効になりません。デフォルトで無効になっているアルゴリズムには脆弱性やパフォーマンスの問題が存在する可能性があるため、このパラメータは慎重に使用してください(例: SSLv3、RC4)。
  • extraFilesToStage: ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例: gs://
  • useStorageWriteApi: true の場合、パイプラインでは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the PostgreSQL to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PostgreSQL_to_BigQuery \
    --parameters \
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • SOURCE_SQL_QUERY: ソース データベースで実行する SQL クエリ
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
  • CONNECTION_PROPERTIES: 必要な場合は JDBC 接続プロパティ
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PostgreSQL_to_BigQuery"
    "parameters": {
      "connectionURL": "JDBC_CONNECTION_URL",
      "query": "SOURCE_SQL_QUERY",
      "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
      "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
      "connectionProperties": "CONNECTION_PROPERTIES",
      "username": "CONNECTION_USERNAME",
      "password": "CONNECTION_PASSWORD",
      "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
    },
    "environment": { "zone": "us-central1-f" }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • SOURCE_SQL_QUERY: ソース データベースで実行する SQL クエリ
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
  • CONNECTION_PROPERTIES: 必要な場合は JDBC 接続プロパティ
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

次のステップ