Workflows を使用してカスタムソースからメタデータをインポートする

このドキュメントでは、Workflows でマネージド接続パイプラインを実行して、サードパーティ ソースから Dataplex にメタデータをインポートする方法について説明します。

マネージド接続パイプラインを設定するには、データソースのコネクタを構築します。次に、Workflows でパイプラインを実行します。このパイプラインは、データソースからメタデータを抽出し、メタデータを Dataplex にインポートします。必要に応じて、パイプラインは Google Cloud プロジェクトに Dataplex Catalog エントリ グループも作成します。

マネージド接続の詳細については、マネージド接続の概要をご覧ください。

始める前に

メタデータをインポートする前に、このセクションのタスクを完了します。

コネクタを構築する

コネクタはデータソースからメタデータを抽出し、Dataplex によってインポートできるメタデータ インポート ファイルを生成します。コネクタは、Dataproc Serverless で実行できる Artifact Registry イメージです。

Google Cloud リソースを構成する

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    パイプラインをスケジュールに従って実行する予定がない場合は、Cloud Scheduler API を有効にする必要はありません。

  2. Secret Manager でシークレットを作成して、サードパーティのデータソースの認証情報を保存します。

  3. Dataproc Serverless for Spark ワークロードを実行するように Virtual Private Cloud(VPC)ネットワークを構成します。

  4. メタデータのインポート ファイルを保存する Cloud Storage バケットを作成します。

  5. 次の Dataplex Catalog リソースを作成します。

    1. インポートするエントリのカスタム アスペクト タイプを作成します。

    2. インポートするエントリのカスタム エントリタイプを作成します。

必要なロール

サービス アカウントはワークフローの ID を表し、ワークフローが持つ権限と、アクセスできる Google Cloud リソースを定めます。Workflows 用(パイプラインを実行するため)と Dataproc Serverless 用(コネクタを実行するため)のサービス アカウントが必要です。

Compute Engine のデフォルトのサービス アカウント(PROJECT_NUMBER-compute@developer.gserviceaccount.com)を使用するか、独自のサービス アカウント(またはアカウント)を作成して、マネージド接続パイプラインを実行できます。

Console

  1. Google Cloud コンソールの [IAM] ページに移動します。

    [IAM] に移動

  2. メタデータをインポートするプロジェクトを選択します。

  3. [アクセス権を付与] をクリックし、サービス アカウントのメールアドレスを入力します。

  4. サービス アカウントに次のロールを割り当てます。

    • ログ書き込み
    • Dataplex エントリ グループ オーナー
    • Dataplex メタデータ ジョブ オーナー
    • Dataplex Catalog 編集者
    • Dataproc 編集者
    • Dataproc ワーカー
    • Secret Manager Secret Accessor - データソースの認証情報を保存するシークレット
    • Storage オブジェクト ユーザー - Cloud Storage バケットに対して
    • Artifact Registry 読み取り - コネクタ イメージを含む Artifact Registry リポジトリ
    • サービス アカウント ユーザー - 異なるサービス アカウントを使用する場合は、ワークフローを実行するサービス アカウントに、Dataproc サーバーレス バッチジョブを実行するサービス アカウントに対するこのロールを付与します。
    • Workflows 起動元 - パイプラインのスケジュールを設定する必要がある場合
  5. 変更を保存します。

gcloud

  1. サービス アカウントにロールを付与します。次のコマンドを実行します。

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    以下を置き換えます。

    • PROJECT_ID: メタデータをインポートするターゲットの Google Cloud プロジェクトの名前。
    • SERVICE_ACCOUNT_ID: サービス アカウント(my-service-account@my-project.iam.gserviceaccount.com など)。
  2. サービス アカウントにリソースレベルで次のロールを付与します。

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    以下を置き換えます。

    • SECRET_ID: データソースの認証情報を格納するシークレットの ID。形式は projects/PROJECT_ID/secrets/SECRET_ID です。
    • BUCKET_ID: Cloud Storage バケットの名前。
    • REPOSITORY: コネクタ イメージを含む Artifact Registry リポジトリ。
    • REPOSITORY_LOCATION: リポジトリがホストされている Google Cloud のロケーション。
  3. Workflows を実行するサービス アカウントに、Dataproc Serverless バッチジョブを実行するサービス アカウントに対する roles/iam.serviceAccountUser ロールを付与します。Workflows と Dataproc Serverless の両方に同じサービス アカウントを使用する場合でも、このロールを付与する必要があります。

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    異なるサービス アカウントを使用する場合、--member フラグの値は、Dataproc サーバーレス バッチジョブを実行するサービス アカウントです。

  4. パイプラインのスケジュールを設定する場合は、サービス アカウントに次のロールを付与します。

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

メタデータのインポート

メタデータをインポートするには、マネージド接続パイプラインを実行するワークフローを作成して実行します。必要に応じて、パイプラインの実行スケジュールを作成することもできます。

Console

  1. ワークフローを作成します。次の情報をお知らせください。

    • サービス アカウント: このドキュメントの必要なロールで構成したサービス アカウント。
    • 暗号化: [Google が管理する暗号鍵] を選択します。

    • ワークフローを定義する: 次の定義ファイルを指定します。

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: set_default_networking
      
          - set_default_networking:
              assign:
                - NETWORK_TYPE: "networkUri"
                - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. パイプラインをオンデマンドで実行するには、ワークフローを実行します。

    次のランタイム引数を指定します。

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    以下を置き換えます。

    • PROJECT_ID: メタデータをインポートするターゲットの Google Cloud プロジェクトの名前。
    • LOCATION_ID: Dataproc サーバーレス ジョブとメタデータ インポート ジョブが実行され、メタデータがインポートされるターゲットの Google Cloud ロケーション。
    • ENTRY_GROUP_ID: メタデータをインポートするエントリ グループの ID。エントリ グループ ID には、英小文字、数字、ハイフンを使用できます。

      このエントリ グループの完全なリソース名は projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID です。

    • CREATE_ENTRY_GROUP_BOOLEAN: プロジェクトにエントリ グループがまだ存在しない場合に、パイプラインでエントリ グループを作成する場合は、この値を true に設定します。
    • BUCKET_ID: コネクタによって生成されたメタデータ インポート ファイルを保存する Cloud Storage バケットの名前。ワークフローの実行ごとに新しいフォルダが作成されます。
    • SERVICE_ACCOUNT_ID: このドキュメントの必要なロールで構成したサービス アカウント。サービス アカウントは、Dataproc サーバーレスでコネクタを実行します。
    • ADDITIONAL_CONNECTOR_ARGUMENTS: コネクタに渡す追加の引数のリスト。例については、メタデータのインポート用にカスタム コネクタを開発するをご覧ください。各引数を二重引用符で囲み、引数をカンマで区切ります。
    • CONTAINER_IMAGE: Artifact Registry でホストされているコネクタのカスタム コンテナ イメージ。
    • ENTRY_TYPES: インポートの対象となるエントリタイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID 形式)。LOCATION_ID は、メタデータをインポートする Google Cloud のロケーションと同じか、global のいずれかである必要があります。
    • ASPECT_TYPES: インポートの対象となるアスペクト タイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID の形式)。LOCATION_ID は、メタデータをインポートする Google Cloud のロケーションと同じか、global のいずれかである必要があります。
    • 省略可: NETWORK_TAGS 引数にネットワーク タグのリストを指定します。
    • 省略可: NETWORK_URI 引数に、データソースに接続する VPC ネットワークの URI を指定します。ネットワークを指定する場合は、サブネットワーク引数を省略します。
    • 省略可: SUBNETWORK_URI 引数に、データソースに接続するサブネットワークの URI を指定します。サブネットを指定する場合は、ネットワーク 引数を省略します。

    インポートするメタデータの量によっては、パイプラインの実行に数分以上かかることがあります。進行状況を確認する方法の詳細については、ワークフローの実行結果にアクセスするをご覧ください。

    パイプラインの実行が完了したら、Dataplex Catalog でインポートされたメタデータを検索できます。

  3. 省略可: パイプラインをスケジュールに従って実行する場合は、Cloud Scheduler を使用してスケジュールを作成します。次の情報をお知らせください。

    • 頻度: パイプラインの実行スケジュールを定義する unix-cron 式。
    • ワークフローの引数: 前の手順で説明したコネクタのランタイム引数。
    • サービス アカウント: サービス アカウント。サービス アカウントがスケジューラを管理します。

gcloud

  1. 次のワークロード定義を YAML ファイルとして保存します。

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Bash 変数を定義し、ワークフローを作成します。必要に応じて、パイプラインの実行スケジュールを作成します。

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    以下を置き換えます。

    • PROJECT_ID: メタデータをインポートするターゲットの Google Cloud プロジェクトの名前。
    • LOCATION_ID: Dataproc サーバーレス ジョブとメタデータ インポート ジョブが実行され、メタデータがインポートされるターゲットの Google Cloud ロケーション。
    • SERVICE_ACCOUNT_ID: このドキュメントの必要なロールで構成したサービス アカウント。
    • WORKFLOW_DEFINITION_FILE: ワークフロー定義 YAML ファイルのパス。
    • WORKFLOW_NAME: ワークフローの名前。
    • WORKFLOW_ARGUMENTS: コネクタに渡すランタイム引数。引数は JSON 形式です。

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      Cloud Scheduler の場合、引用符で囲まれた文字列内の二重引用符は、バックスラッシュ(\)を使用してエスケープします(例: --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}")。

      以下を置き換えます。

      • ENTRY_GROUP_ID: メタデータをインポートするエントリ グループの ID。エントリ グループ ID には、英小文字、数字、ハイフンを使用できます。

        このエントリ グループの完全なリソース名は projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID です。

      • CREATE_ENTRY_GROUP_BOOLEAN: プロジェクトにエントリ グループがまだ存在しない場合に、パイプラインでエントリ グループを作成する場合は、この値を true に設定します。
      • BUCKET_ID: コネクタによって生成されたメタデータ インポート ファイルを保存する Cloud Storage バケットの名前。ワークフローの実行ごとに新しいフォルダが作成されます。
      • ADDITIONAL_CONNECTOR_ARGUMENTS: コネクタに渡す追加の引数のリスト。例については、メタデータのインポート用にカスタム コネクタを開発するをご覧ください。
      • CONTAINER_IMAGE: Artifact Registry でホストされているコネクタのカスタム コンテナ イメージ。
      • ENTRY_TYPES: インポートの対象となるエントリタイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID 形式)。LOCATION_ID は、メタデータをインポートする Google Cloud のロケーションと同じか、global のいずれかである必要があります。
      • ASPECT_TYPES: インポートの対象となるアスペクト タイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID の形式)。LOCATION_ID は、メタデータをインポートする Google Cloud のロケーションと同じか、global のいずれかである必要があります。
      • 省略可: NETWORK_TAGS 引数にネットワーク タグのリストを指定します。
      • 省略可: NETWORK_URI 引数に、データソースに接続する VPC ネットワークの URI を指定します。ネットワークを指定する場合は、サブネットワーク引数を省略します。
      • 省略可: SUBNETWORK_URI 引数に、データソースに接続するサブネットワークの URI を指定します。サブネットを指定する場合は、ネットワーク 引数を省略します。
    • CRON_SCHEDULE_EXPRESSION: パイプラインの実行スケジュールを定義する cron 式。たとえば、毎日午前 0 時にスケジュールを実行するには、式 0 0 * * * を使用します。

  3. パイプラインをオンデマンドで実行するには、ワークフローを実行します。

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    ワークフローの引数は JSON 形式ですが、エスケープされていません。

    インポートするメタデータの量によっては、ワークフローの実行に数分以上かかることがあります。進行状況を確認する方法の詳細については、ワークフローの実行結果にアクセスするをご覧ください。

    パイプラインの実行が完了したら、Dataplex Catalog でインポートされたメタデータを検索できます。

Terraform

  1. cloud-dataplex リポジトリのクローンを作成します。

    このリポジトリには、次の Terraform ファイルが含まれています。

    • main.tf: 作成する Google Cloud リソースを定義します。
    • variables.tf: 変数を宣言します。
    • byo-connector.tfvars: マネージド接続パイプラインの変数を定義します。
  2. .tfvars ファイルを編集して、プレースホルダをコネクタの情報に置き換えます。

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    以下を置き換えます。

    • PROJECT_ID: メタデータをインポートするターゲットの Google Cloud プロジェクトの名前。
    • LOCATION_ID: Dataproc サーバーレス ジョブとメタデータ インポート ジョブが実行され、メタデータがインポートされるターゲットの Google Cloud ロケーション。
    • SERVICE_ACCOUNT_ID: このドキュメントの必要なロールで構成したサービス アカウント。
    • CRON_SCHEDULE_EXPRESSION: パイプラインの実行スケジュールを定義する cron 式。たとえば、毎日午前 0 時にスケジュールを実行するには、式 0 0 * * * を使用します。
    • ENTRY_GROUP_ID: メタデータをインポートするエントリ グループの ID。エントリ グループ ID には、英小文字、数字、ハイフンを使用できます。

      このエントリ グループの完全なリソース名は projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID です。

    • CREATE_ENTRY_GROUP_BOOLEAN: プロジェクトにエントリ グループがまだ存在しない場合に、パイプラインでエントリ グループを作成する場合は、この値を true に設定します。
    • BUCKET_ID: コネクタによって生成されたメタデータ インポート ファイルを保存する Cloud Storage バケットの名前。ワークフローの実行ごとに新しいフォルダが作成されます。
    • ADDITIONAL_CONNECTOR_ARGUMENTS: コネクタに渡す追加の引数のリスト。例については、メタデータのインポート用にカスタム コネクタを開発するをご覧ください。各引数を二重引用符で囲み、引数をカンマで区切ります。
    • CONTAINER_IMAGE: Artifact Registry でホストされているコネクタのカスタム コンテナ イメージ。
    • ENTRY_TYPES: インポートの対象となるエントリタイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID 形式)。LOCATION_ID は、メタデータをインポートする Google Cloud のロケーションと同じか、global のいずれかである必要があります。
    • ASPECT_TYPES: インポートの対象となるアスペクト タイプのリスト(projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID の形式)。LOCATION_ID は、メタデータをインポートする Google Cloud のロケーションと同じか、global のいずれかである必要があります。
    • 省略可: NETWORK_TAGS 引数にネットワーク タグのリストを指定します。
    • 省略可: NETWORK_URI 引数に、データソースに接続する VPC ネットワークの URI を指定します。ネットワークを指定する場合は、サブネットワーク引数を省略します。
    • 省略可: SUBNETWORK_URI 引数に、データソースに接続するサブネットワークの URI を指定します。サブネットを指定する場合は、ネットワーク 引数を省略します。
  3. Terraform を初期化します。

    terraform init
    
  4. .tfvars ファイルを使用して Terraform を検証します。

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    CONNECTOR_VARIABLES_FILE は、変数定義ファイルの名前に置き換えます。

  5. .tfvars ファイルを使用して Terraform をデプロイします。

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform は、指定されたプロジェクトにワークフローと Cloud Scheduler ジョブを作成します。Workflows は、指定したスケジュールでパイプラインを実行します。

    インポートするメタデータの量によっては、ワークフローの実行に数分以上かかることがあります。進行状況を確認する方法の詳細については、ワークフローの実行結果にアクセスするをご覧ください。

    パイプラインの実行が完了したら、Dataplex Catalog でインポートされたメタデータを検索できます。

ジョブのログを表示

Cloud Logging を使用して、マネージド接続パイプラインのログを表示します。ログペイロードには、関連する場合は、Dataproc Serverless バッチジョブとメタデータ インポート ジョブのログへのリンクが含まれます。詳細については、ワークフローのログを表示するをご覧ください。

トラブルシューティング

次のトラブルシューティングのヒントを参考にしてください。

次のステップ