使用 Workflows 從自訂來源匯入中繼資料

本文說明如何在工作流程中執行受管理的連結管道,將第三方來源的中繼資料匯入 Dataplex Universal Catalog。

如要設定代管連線管道,您必須為資料來源建構連接器。接著,您可以在 Workflows 中執行管線。管道會從資料來源擷取中繼資料,然後將中繼資料匯入 Dataplex Universal Catalog。管道也會視需要在 Google Cloud 專案中建立 Dataplex Universal Catalog 項目群組。

如要進一步瞭解受管理的連線,請參閱「受管理的連線總覽」。

事前準備

匯入中繼資料前,請先完成本節中的任務。

建構連接器

連接器會從資料來源擷取中繼資料,並產生可供 Dataplex Universal Catalog 匯入的中繼資料匯入檔案。連接器是可在 Dataproc Serverless 上執行的 Artifact Registry 映像檔。

設定 Google Cloud 資源

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex Universal Catalog, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    如果您不打算依時程表執行管道,就不需要啟用 Cloud Scheduler API。

  2. 在 Secret Manager 中建立密鑰,以便儲存第三方資料來源的憑證。

  3. 設定虛擬私有雲 (VPC) 網路,以便執行 Dataproc Serverless for Spark 工作負載。

  4. 建立 Cloud Storage 值區,用於儲存中繼資料匯入檔案。

  5. 建立下列 Dataplex Universal Catalog 資源:

    1. 為要匯入的項目建立自訂切面類型

    2. 為要匯入的項目建立自訂項目類型

必要的角色

服務帳戶代表工作流程的身分,並決定工作流程擁有哪些權限,以及可以存取哪些資源。 Google Cloud 您需要 Workflows 的服務帳戶 (用於執行管道) 和 Dataproc Serverless 的服務帳戶 (用於執行連接器)。

您可以使用 Compute Engine 預設服務帳戶 (PROJECT_NUMBER-compute@developer.gserviceaccount.com),或自行建立服務帳戶 (或帳戶) 來執行受管理的連線管道。

控制台

  1. 前往 Google Cloud 控制台的「IAM」頁面。

    前往「身分與存取權管理」頁面

  2. 選取要匯入中繼資料的專案。

  3. 按一下 「授予存取權」,然後輸入服務帳戶的電子郵件地址。

  4. 將下列角色指派給服務帳戶:

    • 記錄寫入器
    • Dataplex 項目群組擁有者
    • Dataplex 中繼資料工作擁有者
    • Dataplex Catalog 編輯器
    • Dataproc 編輯器
    • Dataproc 工作者
    • Secret Manager 密鑰存取工具:在儲存資料來源憑證的密鑰上
    • Storage 物件使用者:Cloud Storage 值區
    • Artifact Registry Reader:在包含連接器映像檔的 Artifact Registry 存放區
    • 服務帳戶使用者:如果您使用不同的服務帳戶,請在執行 Dataproc 無伺服器批次工作的服務帳戶上,將這個角色授予執行工作流程的服務帳戶
    • Workflows Invoker:如果您想排定管道
  5. 儲存變更。

gcloud

  1. 將角色授予服務帳戶。執行下列指令:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    更改下列內容:

    • PROJECT_ID:匯入中繼資料的目標 Google Cloud專案名稱。
    • SERVICE_ACCOUNT_ID:服務帳戶,例如 my-service-account@my-project.iam.gserviceaccount.com
  2. 在資源層級授予服務帳戶下列角色:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    更改下列內容:

    • SECRET_ID:儲存資料來源憑證的秘密 ID。使用格式 projects/PROJECT_ID/secrets/SECRET_ID
    • BUCKET_ID:Cloud Storage 值區的名稱。
    • REPOSITORY:包含連接器映像檔的 Artifact Registry 存放區。
    • REPOSITORY_LOCATION: Google Cloud存放存放區的位置。
  3. 將執行工作流程的服務帳戶指派給執行 Dataproc Serverless 批次工作的服務帳戶,並授予 roles/iam.serviceAccountUser 角色。即使您為 Workflows 和 Dataproc Serverless 使用相同的服務帳戶,也必須授予這個角色。

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    如果您使用不同的服務帳戶,--member 標記的值就是執行 Dataproc Serverless 批次工作的服務帳戶。

  4. 如要排定管道執行時間,請將下列角色授予服務帳戶:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

匯入中繼資料

如要匯入中繼資料,請建立並執行執行受管理連線管道的工作流程。您也可以選擇建立排程來執行管道。

控制台

  1. 建立工作流程。用下列資訊作答:

    • 服務帳戶:您在本文件的「必要角色」一節中設定的服務帳戶。
    • 加密:選取 Google-managed encryption key

    • 定義工作流程:提供下列定義檔案:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                  steps:
                   - submit_extract_job_with_default_network_uri:
                        assign:
                          - NETWORK_TYPE: "networkUri"
                          - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: prepare_pyspark_job_body
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: prepare_pyspark_job_body
      
          - prepare_pyspark_job_body:
              assign:
                - pyspark_batch_body:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
              next: add_jar_file_uri_if_present
      
          - add_jar_file_uri_if_present:
              switch:
                - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                  assign:
                    - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch: ${pyspark_batch_body}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
      
  2. 如要按需執行管道,請執行工作流程

    提供下列執行階段引數:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "JAR_FILE_URI": "",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    更改下列內容:

    • PROJECT_ID:匯入中繼資料的目標 Google Cloud專案名稱。
    • LOCATION_ID:Dataproc Serverless 和中繼資料匯入工作將執行的目標 Google Cloud 位置,以及匯入中繼資料的位置。
    • ENTRY_GROUP_ID:要匯入中繼資料的項目群組 ID。項目群組 ID 可包含小寫英文字母、數字和連字號。

      這個項目群組的完整資源名稱為 projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID

    • CREATE_ENTRY_GROUP_BOOLEAN:如果您希望管道在專案中建立輸入群組 (如果尚未存在),請將這個值設為 true
    • BUCKET_ID:Cloud Storage 值區的名稱,用於儲存連接器產生的中繼資料匯入檔案。每次執行工作流程時,系統都會建立新資料夾。
    • SERVICE_ACCOUNT_ID:您在本文件的「必要角色」一節中設定的服務帳戶。服務帳戶會在 Dataproc Serverless 中執行連接器。
    • ADDITIONAL_CONNECTOR_ARGUMENTS:要傳遞至連接器的其他引數清單。如需範例,請參閱「開發用於匯入中繼資料的自訂連接器」。請將每個引數括在雙引號中,並以半形逗號分隔引數。
    • CONTAINER_IMAGE:Artifact Registry 中託管的連接器自訂容器映像檔。
    • ENTRY_TYPES:匯入範圍內的項目類型清單,格式為 projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_IDLOCATION_ID 必須是您匯入中繼資料的Google Cloud 位置,或是 global
    • ASPECT_TYPES:匯入範圍內的面向類型清單,格式為 projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_IDLOCATION_ID 必須是您匯入中繼資料的Google Cloud 位置,或是 global
    • 選用:針對 NETWORK_TAGS 引數,提供網路標記清單。
    • 選用:針對 NETWORK_URI 引數,提供連結資料來源的 VPC 網路 URI。如果您提供網路,請省略子網路引數。
    • 選用:針對 SUBNETWORK_URI 引數,提供連結至資料來源的子網路 URI。如果您提供子網路,請省略網路引數。

    視匯入的中繼資料量而定,管道可能需要數分鐘或更長的時間才能執行。如要進一步瞭解如何查看進度,請參閱「存取工作流程執行結果」。

    管道執行完畢後,您可以在 Dataplex Universal Catalog 中搜尋已匯入的中繼資料

  3. 選用:如果您想依排程執行管道,請使用 Cloud Scheduler 建立排程。用下列資訊作答:

    • 頻率:Unix-Cron 運算式,定義執行管道的排程。
    • 工作流程引數:連接器的執行階段引數,如上一個步驟所述。
    • 服務帳戶:服務帳戶。服務帳戶會管理排程器。

gcloud

  1. 將以下工作負載定義儲存為 YAML 檔案:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                steps:
                 - submit_extract_job_with_default_network_uri:
                      assign:
                        - NETWORK_TYPE: "networkUri"
                        - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: prepare_pyspark_job_body
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: prepare_pyspark_job_body
    
        - prepare_pyspark_job_body:
            assign:
              - pyspark_batch_body:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
            next: add_jar_file_uri_if_present
    
        - add_jar_file_uri_if_present:
            switch:
              - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                assign:
                  - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch: ${pyspark_batch_body}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
    
  2. 定義 Bash 變數、建立工作流程,並視需要建立排程來執行管道:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    更改下列內容:

    • PROJECT_ID:匯入中繼資料的目標 Google Cloud專案名稱。
    • LOCATION_ID:Dataproc Serverless 和中繼資料匯入工作將執行的目標 Google Cloud 位置,以及匯入中繼資料的位置。
    • SERVICE_ACCOUNT_ID:您在本文件的「必要角色」一節中設定的服務帳戶。
    • WORKFLOW_DEFINITION_FILE:工作流程定義 YAML 檔案的路徑。
    • WORKFLOW_NAME:工作流程名稱。
    • WORKFLOW_ARGUMENTS:要傳遞至連接器的執行階段引數。引數為 JSON 格式:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "JAR_FILE_URI": "",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      在 Cloud Scheduler 中,系統會使用反斜線 (\) 逸出引號字串中的雙引號,例如:--message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}"

      更改下列內容:

      • ENTRY_GROUP_ID:要匯入中繼資料的項目群組 ID。項目群組 ID 可包含小寫英文字母、數字和連字號。

        這個項目群組的完整資源名稱為 projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID

      • CREATE_ENTRY_GROUP_BOOLEAN:如果您希望管道在專案中建立輸入群組 (如果尚未存在),請將這個值設為 true
      • BUCKET_ID:Cloud Storage 值區的名稱,用於儲存連接器產生的中繼資料匯入檔案。每次執行工作流程時,系統都會建立新資料夾。
      • ADDITIONAL_CONNECTOR_ARGUMENTS:要傳遞至連接器的其他引數清單。如需範例,請參閱「開發用於匯入中繼資料的自訂連接器」。
      • CONTAINER_IMAGE:Artifact Registry 中託管的連接器自訂容器映像檔。
      • ENTRY_TYPES:匯入範圍內的項目類型清單,格式為 projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_IDLOCATION_ID 必須是您匯入中繼資料的Google Cloud 位置,或是 global
      • ASPECT_TYPES:匯入範圍內的面向類型清單,格式為 projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_IDLOCATION_ID 必須是您匯入中繼資料的Google Cloud 位置,或是 global
      • 選用:針對 NETWORK_TAGS 引數,提供網路標記清單。
      • 選用:針對 NETWORK_URI 引數,提供連結資料來源的 VPC 網路 URI。如果您提供網路,請省略子網路引數。
      • 選用:針對 SUBNETWORK_URI 引數,提供連結至資料來源的子網路 URI。如果您提供子網路,請省略網路引數。
    • CRON_SCHEDULE_EXPRESSION:Cron 運算式,可定義執行管道的時間表。舉例來說,如要每天午夜執行排程,請使用運算式 0 0 * * *

  3. 如要按需執行管道,請執行工作流程

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    工作流程引數採用 JSON 格式,但未逸出。

    視匯入的中繼資料量而定,工作流程可能需要數分鐘或更久的時間才能執行完畢。如要進一步瞭解如何查看進度,請參閱「存取工作流程執行結果」。

    管道執行完畢後,您可以在 Dataplex Universal Catalog 中搜尋已匯入的中繼資料

Terraform

  1. 複製 cloud-dataplex 存放區

    存放區包含下列 Terraform 檔案:

  2. 編輯 .tfvars 檔案,將預留位置替換為連接器的資訊。

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    更改下列內容:

    • PROJECT_ID:匯入中繼資料的目標 Google Cloud專案名稱。
    • LOCATION_ID:Dataproc Serverless 和中繼資料匯入工作將執行的目標 Google Cloud 位置,以及匯入中繼資料的位置。
    • SERVICE_ACCOUNT_ID:您在本文件的「必要角色」一節中設定的服務帳戶。
    • CRON_SCHEDULE_EXPRESSION:Cron 運算式,可定義執行管道的時間表。舉例來說,如要每天午夜執行排程,請使用運算式 0 0 * * *
    • ENTRY_GROUP_ID:要匯入中繼資料的項目群組 ID。項目群組 ID 可包含小寫英文字母、數字和連字號。

      這個項目群組的完整資源名稱為 projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID

    • CREATE_ENTRY_GROUP_BOOLEAN:如果您希望管道在專案中建立輸入群組 (如果尚未存在),請將這個值設為 true
    • BUCKET_ID:Cloud Storage 值區的名稱,用於儲存連接器產生的中繼資料匯入檔案。每次執行工作流程時,系統都會建立新資料夾。
    • ADDITIONAL_CONNECTOR_ARGUMENTS:要傳遞至連接器的其他引數清單。如需範例,請參閱「開發用於匯入中繼資料的自訂連接器」。請將每個引數括在雙引號中,並以半形逗號分隔引數。
    • CONTAINER_IMAGE:Artifact Registry 中託管的連接器自訂容器映像檔。
    • ENTRY_TYPES:匯入範圍內的項目類型清單,格式為 projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_IDLOCATION_ID 必須是您匯入中繼資料的Google Cloud 位置,或是 global
    • ASPECT_TYPES:匯入範圍內的面向類型清單,格式為 projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_IDLOCATION_ID 必須是您匯入中繼資料的Google Cloud 位置,或是 global
    • 選用:針對 NETWORK_TAGS 引數,提供網路標記清單。
    • 選用:針對 NETWORK_URI 引數,提供連結資料來源的 VPC 網路 URI。如果您提供網路,請省略子網路引數。
    • 選用:針對 SUBNETWORK_URI 引數,提供連結至資料來源的子網路 URI。如果您提供子網路,請省略網路引數。
  3. 初始化 Terraform:

    terraform init
    
  4. 使用 .tfvars 檔案驗證 Terraform:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    請將 CONNECTOR_VARIABLES_FILE 替換為變數定義檔的名稱。

  5. 使用 .tfvars 檔案部署 Terraform:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform 會在指定專案中建立工作流程和 Cloud Scheduler 工作。Workflows 會按照您指定的時間表執行管道。

    視匯入的中繼資料量而定,工作流程可能需要數分鐘或更久的時間才能執行完畢。如要進一步瞭解如何查看進度,請參閱「存取工作流程執行結果」。

    管道執行完畢後,您可以在 Dataplex Universal Catalog 中搜尋已匯入的中繼資料

查看工作記錄檔

使用 Cloud Logging 查看受管理連線管道的記錄。記錄酬載包含 Dataproc Serverless 批次工作和中繼資料匯入工作的記錄連結 (如有)。詳情請參閱「查看工作流程記錄」。

疑難排解

請參考下列疑難排解建議:

後續步驟