Pub/Sub to Java Database Connectivity(JDBC)テンプレートは、既存の Pub/Sub サブスクリプションから JSON 文字列としてデータを取り込み、結果のレコードを JDBC に書き込むストリーミング パイプラインです。
パイプラインの要件
- パイプラインを実行する前に Pub/Sub サブスクリプションが存在している必要があります。
- パイプラインを実行する前に JDBC ソースが存在している必要があります。
- パイプラインを実行する前に Pub/Sub 出力デッドレター トピックが存在している必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
driverClassName |
JDBC ドライバのクラス名。例: com.mysql.jdbc.Driver |
connectionUrl |
JDBC 接続 URL 文字列。例: jdbc:mysql://some-host:3306/sampledb この値は、Cloud KMS 鍵で暗号化され、Base64 でエンコードされた文字列として渡すことができます。Base64 でエンコードされた文字列から空白文字を削除します。 |
driverJars |
JDBC ドライバのカンマ区切りの Cloud Storage パス。例: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar |
username |
省略可: JDBC 接続に使用するユーザー名。この値は、Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。 |
password |
省略可: JDBC 接続に使用するパスワード。この値は、Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。 |
connectionProperties |
省略可: JDBC 接続に使用するプロパティ文字列。文字列の形式は [propertyName=property;]* にする必要があります。例: unicode=true;characterEncoding=UTF-8 |
statement |
データベースに対して実行するステートメント。このステートメントには、テーブルの列名を任意の順序で指定する必要があります。指定した列名の値のみが JSON から読み取られ、ステートメントに追加されます。たとえば、INSERT INTO tableName (column1, column2) VALUES (?,?) です。 |
inputSubscription |
読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。 |
outputDeadletterTopic |
配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
KMSEncryptionKey |
省略可: ユーザー名、パスワード、接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、ユーザー名、パスワード、接続文字列はすべて暗号化されて渡されます。 |
extraFilesToStage |
ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例: gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id> |
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub to JDBC template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \ --region REGION_NAME \ --parameters \ driverClassName=DRIVER_CLASS_NAME,\ connectionURL=JDBC_CONNECTION_URL,\ driverJars=DRIVER_PATHS,\ username=CONNECTION_USERNAME,\ password=CONNECTION_PASSWORD,\ connectionProperties=CONNECTION_PROPERTIES,\ statement=SQL_STATEMENT,\ inputSubscription=INPUT_SUBSCRIPTION,\ outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\ KMSEncryptionKey=KMS_ENCRYPTION_KEY
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)DRIVER_CLASS_NAME
: ドライバのクラス名JDBC_CONNECTION_URL
: JDBC 接続 URLDRIVER_PATHS
: カンマで区切った JDBC ドライバの Cloud Storage パスCONNECTION_USERNAME
: JDBC 接続のユーザー名CONNECTION_PASSWORD
: JDBC 接続パスワードCONNECTION_PROPERTIES
: JDBC 接続プロパティ(必要に応じて)SQL_STATEMENT
: データベースに対して実行される SQL ステートメントINPUT_SUBSCRIPTION
: 読み取り元の Pub/Sub 入力サブスクリプションOUTPUT_DEADLETTER_TOPIC
: 配信不能メッセージを転送するための Pub/Sub トピックKMS_ENCRYPTION_KEY
: Cloud KMS 暗号鍵
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "jobName": "JOB_NAME", "parameters": { "driverClassName": "DRIVER_CLASS_NAME", "connectionURL": "JDBC_CONNECTION_URL", "driverJars": "DRIVER_PATHS", "username": "CONNECTION_USERNAME", "password": "CONNECTION_PASSWORD", "connectionProperties": "CONNECTION_PROPERTIES", "statement": "SQL_STATEMENT", "inputSubscription": "INPUT_SUBSCRIPTION", "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC", "KMSEncryptionKey":"KMS_ENCRYPTION_KEY" }, "environment": { "zone": "us-central1-f" }, }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)DRIVER_CLASS_NAME
: ドライバのクラス名JDBC_CONNECTION_URL
: JDBC 接続 URLDRIVER_PATHS
: カンマで区切った JDBC ドライバの Cloud Storage パスCONNECTION_USERNAME
: JDBC 接続のユーザー名CONNECTION_PASSWORD
: JDBC 接続パスワードCONNECTION_PROPERTIES
: JDBC 接続プロパティ(必要に応じて)SQL_STATEMENT
: データベースに対して実行される SQL ステートメントINPUT_SUBSCRIPTION
: 読み取り元の Pub/Sub 入力サブスクリプションOUTPUT_DEADLETTER_TOPIC
: 配信不能メッセージを転送するための Pub/Sub トピックKMS_ENCRYPTION_KEY
: Cloud KMS 暗号鍵
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。