このドキュメントでは、Workflows でマネージド接続パイプラインを実行して、サードパーティ ソースから Dataplex にメタデータをインポートする方法について説明します。
マネージド接続パイプラインを設定するには、データソースのコネクタを構築します。次に、Workflows でパイプラインを実行します。このパイプラインは、データソースからメタデータを抽出し、メタデータを Dataplex にインポートします。必要に応じて、パイプラインは Google Cloud プロジェクトに Dataplex Catalog エントリ グループも作成します。
マネージド接続の詳細については、マネージド接続の概要をご覧ください。
始める前に
メタデータをインポートする前に、このセクションのタスクを完了します。
コネクタを構築する
コネクタはデータソースからメタデータを抽出し、Dataplex によってインポートできるメタデータ インポート ファイルを生成します。コネクタは、Dataproc Serverless で実行できる Artifact Registry イメージです。
サードパーティ ソースからメタデータを抽出するカスタム コネクタを構築します。
独自のコネクタを構築するための参照テンプレートとして使用できるコネクタの例については、メタデータのインポート用にカスタム コネクタを開発するをご覧ください。
Google Cloud リソースを構成する
-
Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.
パイプラインをスケジュールに従って実行する予定がない場合は、Cloud Scheduler API を有効にする必要はありません。
Secret Manager でシークレットを作成して、サードパーティのデータソースの認証情報を保存します。
Dataproc Serverless for Spark ワークロードを実行するように Virtual Private Cloud(VPC)ネットワークを構成します。
メタデータのインポート ファイルを保存する Cloud Storage バケットを作成します。
次の Dataplex Catalog リソースを作成します。
インポートするエントリのカスタム アスペクト タイプを作成します。
インポートするエントリのカスタム エントリタイプを作成します。
必要なロール
サービス アカウントはワークフローの ID を表し、ワークフローが持つ権限と、アクセスできる Google Cloud リソースを定めます。Workflows 用(パイプラインを実行するため)と Dataproc Serverless 用(コネクタを実行するため)のサービス アカウントが必要です。
Compute Engine のデフォルトのサービス アカウント(PROJECT_NUMBER-compute@developer.gserviceaccount.com
)を使用するか、独自のサービス アカウント(またはアカウント)を作成して、マネージド接続パイプラインを実行できます。
Console
Google Cloud コンソールの [IAM] ページに移動します。
メタデータをインポートするプロジェクトを選択します。
[アクセス権を付与] をクリックし、サービス アカウントのメールアドレスを入力します。
サービス アカウントに次のロールを割り当てます。
- ログ書き込み
- Dataplex エントリ グループ オーナー
- Dataplex メタデータ ジョブ オーナー
- Dataplex Catalog 編集者
- Dataproc 編集者
- Dataproc ワーカー
- Secret Manager Secret Accessor - データソースの認証情報を保存するシークレット
- Storage オブジェクト ユーザー - Cloud Storage バケットに対して
- Artifact Registry 読み取り - コネクタ イメージを含む Artifact Registry リポジトリ
- サービス アカウント ユーザー - 異なるサービス アカウントを使用する場合は、ワークフローを実行するサービス アカウントに、Dataproc サーバーレス バッチジョブを実行するサービス アカウントに対するこのロールを付与します。
- Workflows 起動元 - パイプラインのスケジュールを設定する必要がある場合
変更を保存します。
gcloud
サービス アカウントにロールを付与します。次のコマンドを実行します。
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/logging.logWriter gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.entryGroupOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.metadataJobOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.catalogEditor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataproc.editor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataproc.worker
以下を置き換えます。
-
PROJECT_ID
: メタデータをインポートするターゲットの Google Cloud プロジェクトの名前。 SERVICE_ACCOUNT_ID
: サービス アカウント(my-service-account@my-project.iam.gserviceaccount.com
など)。
-
サービス アカウントにリソースレベルで次のロールを付与します。
gcloud secrets add-iam-policy-binding SECRET_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/secretmanager.secretaccessor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/storage.objectUser \ --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID') gcloud artifacts repositories add-iam-policy-binding REPOSITORY \ --location=REPOSITORY_LOCATION \ --member=SERVICE_ACCOUNT_ID} \ --role=roles/artifactregistry.reader
以下を置き換えます。
SECRET_ID
: データソースの認証情報を格納するシークレットの ID。形式はprojects/PROJECT_ID/secrets/SECRET_ID
です。BUCKET_ID
: Cloud Storage バケットの名前。REPOSITORY
: コネクタ イメージを含む Artifact Registry リポジトリ。REPOSITORY_LOCATION
: リポジトリがホストされている Google Cloud のロケーション。
Workflows を実行するサービス アカウントに、Dataproc Serverless バッチジョブを実行するサービス アカウントに対する
roles/iam.serviceAccountUser
ロールを付与します。Workflows と Dataproc Serverless の両方に同じサービス アカウントを使用する場合でも、このロールを付与する必要があります。gcloud iam service-accounts add-iam-policy-binding \ serviceAccount:SERVICE_ACCOUNT_ID \ --member='SERVICE_ACCOUNT_ID' \ --role='roles/iam.serviceAccountUser'
異なるサービス アカウントを使用する場合、
--member
フラグの値は、Dataproc サーバーレス バッチジョブを実行するサービス アカウントです。パイプラインのスケジュールを設定する場合は、サービス アカウントに次のロールを付与します。
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="SERVICE_ACCOUNT_ID" \ --role=roles/workflows.invoker
メタデータのインポート
メタデータをインポートするには、マネージド接続パイプラインを実行するワークフローを作成して実行します。必要に応じて、パイプラインの実行スケジュールを作成することもできます。
Console
ワークフローを作成します。次の情報をお知らせください。
- サービス アカウント: このドキュメントの必要なロールで構成したサービス アカウント。
暗号化: [Google が管理する暗号鍵] を選択します。
ワークフローを定義する: 次の定義ファイルを指定します。
パイプラインをオンデマンドで実行するには、ワークフローを実行します。
次のランタイム引数を指定します。
以下を置き換えます。
-
PROJECT_ID
: メタデータをインポートするターゲットの Google Cloud プロジェクトの名前。 -
LOCATION_ID
: Dataproc サーバーレス ジョブとメタデータ インポート ジョブが実行され、メタデータがインポートされるターゲットの Google Cloud ロケーション。 -
ENTRY_GROUP_ID
: メタデータをインポートするエントリ グループの ID。エントリ グループ ID には、英小文字、数字、ハイフンを使用できます。このエントリ グループの完全なリソース名は
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
です。 -
CREATE_ENTRY_GROUP_BOOLEAN
: プロジェクトにエントリ グループがまだ存在しない場合に、パイプラインでエントリ グループを作成する場合は、この値をtrue
に設定します。 -
BUCKET_ID
: コネクタによって生成されたメタデータ インポート ファイルを保存する Cloud Storage バケットの名前。ワークフローの実行ごとに新しいフォルダが作成されます。 -
SERVICE_ACCOUNT_ID
: このドキュメントの必要なロールで構成したサービス アカウント。サービス アカウントは、Dataproc サーバーレスでコネクタを実行します。 -
ADDITIONAL_CONNECTOR_ARGUMENTS
: コネクタに渡す追加の引数のリスト。例については、メタデータのインポート用にカスタム コネクタを開発するをご覧ください。各引数を二重引用符で囲み、引数をカンマで区切ります。 -
CONTAINER_IMAGE
: Artifact Registry でホストされているコネクタのカスタム コンテナ イメージ。 -
ENTRY_TYPES
: インポートの対象となるエントリタイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
形式)。LOCATION_ID
は、メタデータをインポートする Google Cloud のロケーションと同じか、global
のいずれかである必要があります。 -
ASPECT_TYPES
: インポートの対象となるアスペクト タイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
の形式)。LOCATION_ID
は、メタデータをインポートする Google Cloud のロケーションと同じか、global
のいずれかである必要があります。 -
省略可:
NETWORK_TAGS
引数にネットワーク タグのリストを指定します。 -
省略可:
NETWORK_URI
引数に、データソースに接続する VPC ネットワークの URI を指定します。ネットワークを指定する場合は、サブネットワーク引数を省略します。 -
省略可:
SUBNETWORK_URI
引数に、データソースに接続するサブネットワークの URI を指定します。サブネットを指定する場合は、ネットワーク 引数を省略します。
インポートするメタデータの量によっては、パイプラインの実行に数分以上かかることがあります。進行状況を確認する方法の詳細については、ワークフローの実行結果にアクセスするをご覧ください。
パイプラインの実行が完了したら、Dataplex Catalog でインポートされたメタデータを検索できます。
-
省略可: パイプラインをスケジュールに従って実行する場合は、Cloud Scheduler を使用してスケジュールを作成します。次の情報をお知らせください。
- 頻度: パイプラインの実行スケジュールを定義する unix-cron 式。
- ワークフローの引数: 前の手順で説明したコネクタのランタイム引数。
- サービス アカウント: サービス アカウント。サービス アカウントがスケジューラを管理します。
gcloud
次のワークロード定義を YAML ファイルとして保存します。
Bash 変数を定義し、ワークフローを作成します。必要に応じて、パイプラインの実行スケジュールを作成します。
以下を置き換えます。
-
PROJECT_ID
: メタデータをインポートするターゲットの Google Cloud プロジェクトの名前。 -
LOCATION_ID
: Dataproc サーバーレス ジョブとメタデータ インポート ジョブが実行され、メタデータがインポートされるターゲットの Google Cloud ロケーション。 -
SERVICE_ACCOUNT_ID
: このドキュメントの必要なロールで構成したサービス アカウント。 WORKFLOW_DEFINITION_FILE
: ワークフロー定義 YAML ファイルのパス。WORKFLOW_NAME
: ワークフローの名前。WORKFLOW_ARGUMENTS
: コネクタに渡すランタイム引数。引数は JSON 形式です。Cloud Scheduler の場合、引用符で囲まれた文字列内の二重引用符は、バックスラッシュ(\)を使用してエスケープします(例:
--message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}"
)。以下を置き換えます。
-
ENTRY_GROUP_ID
: メタデータをインポートするエントリ グループの ID。エントリ グループ ID には、英小文字、数字、ハイフンを使用できます。このエントリ グループの完全なリソース名は
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
です。 -
CREATE_ENTRY_GROUP_BOOLEAN
: プロジェクトにエントリ グループがまだ存在しない場合に、パイプラインでエントリ グループを作成する場合は、この値をtrue
に設定します。 -
BUCKET_ID
: コネクタによって生成されたメタデータ インポート ファイルを保存する Cloud Storage バケットの名前。ワークフローの実行ごとに新しいフォルダが作成されます。 -
ADDITIONAL_CONNECTOR_ARGUMENTS
: コネクタに渡す追加の引数のリスト。例については、メタデータのインポート用にカスタム コネクタを開発するをご覧ください。 -
CONTAINER_IMAGE
: Artifact Registry でホストされているコネクタのカスタム コンテナ イメージ。 -
ENTRY_TYPES
: インポートの対象となるエントリタイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
形式)。LOCATION_ID
は、メタデータをインポートする Google Cloud のロケーションと同じか、global
のいずれかである必要があります。 -
ASPECT_TYPES
: インポートの対象となるアスペクト タイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
の形式)。LOCATION_ID
は、メタデータをインポートする Google Cloud のロケーションと同じか、global
のいずれかである必要があります。 -
省略可:
NETWORK_TAGS
引数にネットワーク タグのリストを指定します。 -
省略可:
NETWORK_URI
引数に、データソースに接続する VPC ネットワークの URI を指定します。ネットワークを指定する場合は、サブネットワーク引数を省略します。 -
省略可:
SUBNETWORK_URI
引数に、データソースに接続するサブネットワークの URI を指定します。サブネットを指定する場合は、ネットワーク 引数を省略します。
-
CRON_SCHEDULE_EXPRESSION
: パイプラインの実行スケジュールを定義する cron 式。たとえば、毎日午前 0 時にスケジュールを実行するには、式0 0 * * *
を使用します。
-
パイプラインをオンデマンドで実行するには、ワークフローを実行します。
ワークフローの引数は JSON 形式ですが、エスケープされていません。
インポートするメタデータの量によっては、ワークフローの実行に数分以上かかることがあります。進行状況を確認する方法の詳細については、ワークフローの実行結果にアクセスするをご覧ください。
パイプラインの実行が完了したら、Dataplex Catalog でインポートされたメタデータを検索できます。
Terraform
cloud-dataplex
リポジトリのクローンを作成します。このリポジトリには、次の Terraform ファイルが含まれています。
main.tf
: 作成する Google Cloud リソースを定義します。variables.tf
: 変数を宣言します。byo-connector.tfvars
: マネージド接続パイプラインの変数を定義します。
.tfvars
ファイルを編集して、プレースホルダをコネクタの情報に置き換えます。以下を置き換えます。
-
PROJECT_ID
: メタデータをインポートするターゲットの Google Cloud プロジェクトの名前。 -
LOCATION_ID
: Dataproc サーバーレス ジョブとメタデータ インポート ジョブが実行され、メタデータがインポートされるターゲットの Google Cloud ロケーション。 -
SERVICE_ACCOUNT_ID
: このドキュメントの必要なロールで構成したサービス アカウント。 -
CRON_SCHEDULE_EXPRESSION
: パイプラインの実行スケジュールを定義する cron 式。たとえば、毎日午前 0 時にスケジュールを実行するには、式0 0 * * *
を使用します。 -
ENTRY_GROUP_ID
: メタデータをインポートするエントリ グループの ID。エントリ グループ ID には、英小文字、数字、ハイフンを使用できます。このエントリ グループの完全なリソース名は
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
です。 -
CREATE_ENTRY_GROUP_BOOLEAN
: プロジェクトにエントリ グループがまだ存在しない場合に、パイプラインでエントリ グループを作成する場合は、この値をtrue
に設定します。 -
BUCKET_ID
: コネクタによって生成されたメタデータ インポート ファイルを保存する Cloud Storage バケットの名前。ワークフローの実行ごとに新しいフォルダが作成されます。 -
ADDITIONAL_CONNECTOR_ARGUMENTS
: コネクタに渡す追加の引数のリスト。例については、メタデータのインポート用にカスタム コネクタを開発するをご覧ください。各引数を二重引用符で囲み、引数をカンマで区切ります。 -
CONTAINER_IMAGE
: Artifact Registry でホストされているコネクタのカスタム コンテナ イメージ。 -
ENTRY_TYPES
: インポートの対象となるエントリタイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
形式)。LOCATION_ID
は、メタデータをインポートする Google Cloud のロケーションと同じか、global
のいずれかである必要があります。 -
ASPECT_TYPES
: インポートの対象となるアスペクト タイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
の形式)。LOCATION_ID
は、メタデータをインポートする Google Cloud のロケーションと同じか、global
のいずれかである必要があります。 -
省略可:
NETWORK_TAGS
引数にネットワーク タグのリストを指定します。 -
省略可:
NETWORK_URI
引数に、データソースに接続する VPC ネットワークの URI を指定します。ネットワークを指定する場合は、サブネットワーク引数を省略します。 -
省略可:
SUBNETWORK_URI
引数に、データソースに接続するサブネットワークの URI を指定します。サブネットを指定する場合は、ネットワーク 引数を省略します。
-
Terraform を初期化します。
terraform init
.tfvars
ファイルを使用して Terraform を検証します。terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
CONNECTOR_VARIABLES_FILE
は、変数定義ファイルの名前に置き換えます。.tfvars
ファイルを使用して Terraform をデプロイします。terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
Terraform は、指定されたプロジェクトにワークフローと Cloud Scheduler ジョブを作成します。Workflows は、指定したスケジュールでパイプラインを実行します。
インポートするメタデータの量によっては、ワークフローの実行に数分以上かかることがあります。進行状況を確認する方法の詳細については、ワークフローの実行結果にアクセスするをご覧ください。
パイプラインの実行が完了したら、Dataplex Catalog でインポートされたメタデータを検索できます。
ジョブのログを表示
Cloud Logging を使用して、マネージド接続パイプラインのログを表示します。ログペイロードには、関連する場合は、Dataproc Serverless バッチジョブとメタデータ インポート ジョブのログへのリンクが含まれます。詳細については、ワークフローのログを表示するをご覧ください。
トラブルシューティング
次のトラブルシューティングのヒントを参考にしてください。
- 情報レベルのロギングではなくデバッグレベルのロギングを使用するように、メタデータ ジョブのインポート ジョブのログレベルを構成します。
- Dataproc サーバーレス バッチジョブ(コネクタの実行用)とメタデータ インポート ジョブのログを確認します。詳細については、Dataproc Serverless for Spark のログにクエリを実行すると メタデータ ジョブログにクエリを実行するをご覧ください。
- パイプラインを使用してエントリをインポートできず、エラー メッセージに十分な情報がない場合は、テスト用エントリ グループに同じ詳細情報を持つカスタム エントリを作成してみてください。詳細については、カスタム エントリを作成するをご覧ください。