Metadaten mithilfe von Workflows aus einer benutzerdefinierten Quelle importieren

In diesem Dokument wird beschrieben, wie Sie Metadaten aus einer Drittanbieterquelle in Dataplex Universal Catalog importieren, indem Sie eine Pipeline für verwaltete Verbindungen in Workflows ausführen.

Um eine Pipeline für verwaltete Verbindungen einzurichten, erstellen Sie einen Connector für Ihre Datenquelle. Anschließend führen Sie die Pipeline in Workflows aus. Die Pipeline extrahiert Metadaten aus Ihrer Datenquelle und importiert sie dann in Dataplex Universal Catalog. Bei Bedarf werden mit der Pipeline auch Dataplex Universal Catalog-Eintragsgruppen in Ihrem Google Cloud -Projekt erstellt.

Weitere Informationen zu verwalteten Verbindungen finden Sie unter Übersicht über verwaltete Verbindungen.

Hinweise

Führen Sie die Aufgaben in diesem Abschnitt aus, bevor Sie Metadaten importieren.

Connector erstellen

Ein Connector extrahiert die Metadaten aus Ihrer Datenquelle und generiert eine Metadaten-Importdatei, die von Dataplex Universal Catalog importiert werden kann. Der Connector ist ein Artifact Registry-Image, das auf Dataproc Serverless ausgeführt werden kann.

Google Cloud -Ressourcen konfigurieren

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex Universal Catalog, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Wenn Sie die Pipeline nicht nach einem Zeitplan ausführen möchten, müssen Sie die Cloud Scheduler API nicht aktivieren.

  2. Erstellen Sie Secrets in Secret Manager, um die Anmeldedaten für Ihre Drittanbieterdatenquelle zu speichern.

  3. VPC-Netzwerk (Virtual Private Cloud) konfigurieren, um Dataproc Serverless für Spark-Arbeitslasten auszuführen.

  4. Erstellen Sie einen Cloud Storage-Bucket zum Speichern der Metadaten-Importdateien.

  5. Erstellen Sie die folgenden Dataplex Universal Catalog-Ressourcen:

    1. Benutzerdefinierte Aspekttypen für die Einträge erstellen, die Sie importieren möchten.

    2. Benutzerdefinierte Eintragstypen für die Einträge erstellen, die Sie importieren möchten.

Erforderliche Rollen

Ein Dienstkonto repräsentiert die Identität eines Workflows und bestimmt, welche Berechtigungen der Workflow hat und auf welche Google Cloud Ressourcen er zugreifen kann. Sie benötigen ein Dienstkonto für Workflows (zum Ausführen der Pipeline) und für Dataproc Serverless (zum Ausführen des Connectors).

Sie können das Compute Engine-Standarddienstkonto (PROJECT_NUMBER-compute@developer.gserviceaccount.com) verwenden oder ein eigenes Dienstkonto (oder mehrere Konten) erstellen, um die Pipeline für verwaltete Verbindungen auszuführen.

Console

  1. Rufen Sie in der Google Cloud Console die Seite IAM auf.

    IAM aufrufen

  2. Wählen Sie das Projekt aus, in das Sie Metadaten importieren möchten.

  3. Klicken Sie auf Zugriff gewähren und geben Sie dann die E-Mail-Adresse des Dienstkontos ein.

  4. Weisen Sie dem Dienstkonto die folgenden Rollen zu:

    • Log-Autor
    • Dataplex Entry Group Owner
    • Dataplex Metadata Job Owner
    • Dataplex Catalog Editor
    • Dataproc-Bearbeiter
    • Dataproc-Worker
    • Zugriffsperson für Secret Manager-Secret für das Secret, in dem die Anmeldedaten für Ihre Datenquelle gespeichert sind
    • Storage Object User für den Cloud Storage-Bucket
    • Artifact Registry Reader: für das Artifact Registry-Repository, das das Connector-Image enthält
    • Dienstkontonutzer: Wenn Sie verschiedene Dienstkonten verwenden, weisen Sie dem Dienstkonto, auf dem Workflows ausgeführt werden, diese Rolle für das Dienstkonto zu, auf dem die serverlosen Dataproc-Batchjobs ausgeführt werden.
    • Workflow-Aufrufer: wenn Sie die Pipeline planen möchten
  5. Speichern Sie die Änderungen.

gcloud

  1. Weisen Sie dem Dienstkonto Rollen zu. Führen Sie folgende Befehle aus:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Der Name des Zielprojekts Google Cloud, in das die Metadaten importiert werden sollen.
    • SERVICE_ACCOUNT_ID: das Dienstkonto, z. B. my-service-account@my-project.iam.gserviceaccount.com.
  2. Weisen Sie dem Dienstkonto die folgenden Rollen auf Ressourcenebene zu:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Ersetzen Sie Folgendes:

    • SECRET_ID: Die ID des Secrets, in dem die Anmeldedaten für Ihre Datenquelle gespeichert sind. Dazu wird das Format projects/PROJECT_ID/secrets/SECRET_ID verwendet.
    • BUCKET_ID: der Name des Cloud Storage-Buckets.
    • REPOSITORY: das Artifact Registry-Repository, das das Connector-Image enthält.
    • REPOSITORY_LOCATION: Der Google CloudSpeicherort, an dem das Repository gehostet wird.
  3. Weisen Sie dem Dienstkonto, auf dem Workflows ausgeführt werden, die Rolle roles/iam.serviceAccountUser für das Dienstkonto zu, auf dem die serverlosen Dataproc-Batchjobs ausgeführt werden. Sie müssen diese Rolle auch dann zuweisen, wenn Sie dasselbe Dienstkonto für Workflows und Dataproc Serverless verwenden.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Wenn Sie verschiedene Dienstkonten verwenden, ist der Wert für das Flag --member das Dienstkonto, mit dem die serverlosen Dataproc-Batchjobs ausgeführt werden.

  4. Wenn Sie die Pipeline planen möchten, weisen Sie dem Dienstkonto die folgende Rolle zu:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Metadaten importieren

Wenn Sie Metadaten importieren möchten, erstellen Sie einen Workflow und führen Sie ihn aus, um die Pipeline für verwaltete Verbindungen auszuführen. Optional können Sie auch einen Zeitplan für die Ausführung der Pipeline erstellen.

Console

  1. Workflow erstellen Geben Sie die folgenden Informationen an:

    • Dienstkonto: Das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben.
    • Verschlüsselung: Wählen Sie Google-managed encryption key aus.

    • Workflow definieren: Stellen Sie die folgende Definitionsdatei bereit:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                  steps:
                   - submit_extract_job_with_default_network_uri:
                        assign:
                          - NETWORK_TYPE: "networkUri"
                          - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: prepare_pyspark_job_body
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: prepare_pyspark_job_body
      
          - prepare_pyspark_job_body:
              assign:
                - pyspark_batch_body:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
              next: add_jar_file_uri_if_present
      
          - add_jar_file_uri_if_present:
              switch:
                - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                  assign:
                    - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch: ${pyspark_batch_body}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
      
  2. Wenn Sie die Pipeline on demand ausführen möchten, führen Sie den Workflow aus.

    Geben Sie die folgenden Laufzeitargumente an:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "JAR_FILE_URI": "",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Der Name des Zielprojekts Google Cloud, in das die Metadaten importiert werden sollen.
    • LOCATION_ID: der Ziel Google Cloud standort, an dem die serverlosen Dataproc- und Metadatenimportjobs ausgeführt werden und in den Metadaten importiert werden.
    • ENTRY_GROUP_ID: Die ID der Eintragsgruppe, in die Metadaten importiert werden sollen. Die ID der Eintragsgruppe kann Kleinbuchstaben, Ziffern und Bindestriche enthalten.

      Der vollständige Ressourcenname dieser Eintragsgruppe ist projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: Wenn die Pipeline die Eintragsgruppe erstellen soll, falls sie noch nicht in Ihrem Projekt vorhanden ist, legen Sie diesen Wert auf true fest.
    • BUCKET_ID: Der Name des Cloud Storage-Buckets, in dem die vom Connector generierte Datei für den Metadatenimport gespeichert wird. Bei jeder Workflow-Ausführung wird ein neuer Ordner erstellt.
    • SERVICE_ACCOUNT_ID: Das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben. Das Dienstkonto führt den Connector in Dataproc Serverless aus.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: Eine Liste mit zusätzlichen Argumenten, die an den Connector übergeben werden sollen. Beispiele finden Sie unter Benutzerdefinierten Connector für den Metadatenimport entwickeln. Setzen Sie jedes Argument in doppelte Anführungszeichen und trennen Sie die Argumente durch Kommas.
    • CONTAINER_IMAGE: Das benutzerdefinierte Container-Image des Connectors, das in Artifact Registry gehostet wird.
    • ENTRY_TYPES: Eine Liste der Eintragstypen, die für den Import infrage kommen, im Format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, in den Sie Metadaten importieren, oder global.
    • ASPECT_TYPES: Eine Liste der Aspekttypen, die für den Import infrage kommen, im Format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, in den Sie Metadaten importieren, oder global.
    • Optional: Geben Sie für das Argument NETWORK_TAGS eine Liste von Netzwerk-Tags an.
    • Optional: Geben Sie für das Argument NETWORK_URI den URI des VPC-Netzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Netzwerk angeben, lassen Sie das Subnetzargument weg.
    • Optional: Geben Sie für das SUBNETWORK_URI-Argument den URI des Subnetzwerks an, das mit der Datenquelle verbunden ist. Wenn Sie ein Subnetz angeben, lassen Sie das Netzwerkargument weg.

    Je nach Menge der importierten Metadaten kann die Ausführung der Pipeline mehrere Minuten oder länger dauern. Weitere Informationen zum Aufrufen des Fortschritts finden Sie unter Ergebnisse der Workflow-Ausführung aufrufen.

    Nachdem die Pipeline ausgeführt wurde, können Sie in Dataplex Universal Catalog nach den importierten Metadaten suchen.

  3. Optional: Wenn Sie die Pipeline nach einem Zeitplan ausführen möchten, erstellen Sie einen Zeitplan mit Cloud Scheduler. Geben Sie die folgenden Informationen an:

    • Häufigkeit: Ein unix-cron-Ausdruck, der den Zeitplan für die Ausführung der Pipeline definiert.
    • Workflow-Argument: Die Laufzeitargumente für den Connector, wie im vorherigen Schritt beschrieben.
    • Dienstkonto: Das Dienstkonto. Das Dienstkonto verwaltet den Scheduler.

gcloud

  1. Speichern Sie die folgende Arbeitslastdefinition als YAML-Datei:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                steps:
                 - submit_extract_job_with_default_network_uri:
                      assign:
                        - NETWORK_TYPE: "networkUri"
                        - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: prepare_pyspark_job_body
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: prepare_pyspark_job_body
    
        - prepare_pyspark_job_body:
            assign:
              - pyspark_batch_body:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
            next: add_jar_file_uri_if_present
    
        - add_jar_file_uri_if_present:
            switch:
              - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                assign:
                  - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch: ${pyspark_batch_body}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
    
  2. Definieren Sie Bash-Variablen, erstellen Sie den Workflow und erstellen Sie optional einen Zeitplan für die Ausführung der Pipeline:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Der Name des Zielprojekts Google Cloud, in das die Metadaten importiert werden sollen.
    • LOCATION_ID: der Ziel Google Cloud standort, an dem die serverlosen Dataproc- und Metadatenimportjobs ausgeführt werden und in den Metadaten importiert werden.
    • SERVICE_ACCOUNT_ID: Das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben.
    • WORKFLOW_DEFINITION_FILE: Der Pfad zur YAML-Datei mit der Workflowdefinition.
    • WORKFLOW_NAME: Der Name des Workflows.
    • WORKFLOW_ARGUMENTS: die Laufzeitargumente, die an den Connector übergeben werden sollen. Die Argumente haben das JSON-Format:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "JAR_FILE_URI": "",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      Bei Cloud Scheduler werden die doppelten Anführungszeichen innerhalb des Strings in Anführungszeichen mit Schrägstrichen (\) maskiert. Beispiel: --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      Ersetzen Sie Folgendes:

      • ENTRY_GROUP_ID: Die ID der Eintragsgruppe, in die Metadaten importiert werden sollen. Die ID der Eintragsgruppe kann Kleinbuchstaben, Ziffern und Bindestriche enthalten.

        Der vollständige Ressourcenname dieser Eintragsgruppe ist projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN: Wenn die Pipeline die Eintragsgruppe erstellen soll, falls sie noch nicht in Ihrem Projekt vorhanden ist, legen Sie diesen Wert auf true fest.
      • BUCKET_ID: Der Name des Cloud Storage-Buckets, in dem die vom Connector generierte Datei für den Metadatenimport gespeichert wird. Bei jeder Workflow-Ausführung wird ein neuer Ordner erstellt.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: Eine Liste mit zusätzlichen Argumenten, die an den Connector übergeben werden sollen. Beispiele finden Sie unter Benutzerdefinierten Connector für den Metadatenimport entwickeln.
      • CONTAINER_IMAGE: Das benutzerdefinierte Container-Image des Connectors, das in Artifact Registry gehostet wird.
      • ENTRY_TYPES: Eine Liste der Eintragstypen, die für den Import infrage kommen, im Format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, in den Sie Metadaten importieren, oder global.
      • ASPECT_TYPES: Eine Liste der Aspekttypen, die für den Import infrage kommen, im Format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, in den Sie Metadaten importieren, oder global.
      • Optional: Geben Sie für das Argument NETWORK_TAGS eine Liste von Netzwerk-Tags an.
      • Optional: Geben Sie für das Argument NETWORK_URI den URI des VPC-Netzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Netzwerk angeben, lassen Sie das Subnetzargument weg.
      • Optional: Geben Sie für das SUBNETWORK_URI-Argument den URI des Subnetzwerks an, das mit der Datenquelle verbunden ist. Wenn Sie ein Subnetz angeben, lassen Sie das Netzwerkargument weg.
    • CRON_SCHEDULE_EXPRESSION: Ein Cron-Ausdruck, der den Zeitplan für die Ausführung der Pipeline definiert. Wenn Sie den Zeitplan beispielsweise jeden Tag um Mitternacht ausführen möchten, verwenden Sie den Ausdruck 0 0 * * *.

  3. So führen Sie die Pipeline bei Bedarf aus: Workflow ausführen:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Die Workflow-Argumente sind im JSON-Format, aber nicht maskiert.

    Je nachdem, wie viele Metadaten Sie importieren, kann es einige Minuten oder länger dauern, bis der Workflow ausgeführt wird. Weitere Informationen dazu, wie Sie den Fortschritt ansehen können, finden Sie unter Ergebnisse der Workflow-Ausführung aufrufen.

    Nachdem die Pipeline ausgeführt wurde, können Sie in Dataplex Universal Catalog nach den importierten Metadaten suchen.

Terraform

  1. Klonen Sie das cloud-dataplex-Repository.

    Das Repository enthält die folgenden Terraform-Dateien:

    • main.tf: Definiert die zu erstellenden Google Cloud -Ressourcen.
    • variables.tf: deklariert die Variablen.
    • byo-connector.tfvars: Definiert die Variablen für Ihre Pipeline für verwaltete Verbindungen.
  2. Bearbeiten Sie die Datei .tfvars und ersetzen Sie die Platzhalter durch die Informationen für Ihren Connector.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Der Name des Zielprojekts Google Cloud, in das die Metadaten importiert werden sollen.
    • LOCATION_ID: der Ziel Google Cloud standort, an dem die serverlosen Dataproc- und Metadatenimportjobs ausgeführt werden und in den Metadaten importiert werden.
    • SERVICE_ACCOUNT_ID: Das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben.
    • CRON_SCHEDULE_EXPRESSION: Ein Cron-Ausdruck, der den Zeitplan für die Ausführung der Pipeline definiert. Wenn Sie den Zeitplan beispielsweise jeden Tag um Mitternacht ausführen möchten, verwenden Sie den Ausdruck 0 0 * * *.
    • ENTRY_GROUP_ID: Die ID der Eintragsgruppe, in die Metadaten importiert werden sollen. Die ID der Eintragsgruppe kann Kleinbuchstaben, Ziffern und Bindestriche enthalten.

      Der vollständige Ressourcenname dieser Eintragsgruppe ist projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: Wenn die Pipeline die Eintragsgruppe erstellen soll, falls sie noch nicht in Ihrem Projekt vorhanden ist, legen Sie diesen Wert auf true fest.
    • BUCKET_ID: Der Name des Cloud Storage-Buckets, in dem die vom Connector generierte Datei für den Metadatenimport gespeichert wird. Bei jeder Workflow-Ausführung wird ein neuer Ordner erstellt.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: Eine Liste mit zusätzlichen Argumenten, die an den Connector übergeben werden sollen. Beispiele finden Sie unter Benutzerdefinierten Connector für den Metadatenimport entwickeln. Setzen Sie jedes Argument in doppelte Anführungszeichen und trennen Sie die Argumente durch Kommas.
    • CONTAINER_IMAGE: Das benutzerdefinierte Container-Image des Connectors, das in Artifact Registry gehostet wird.
    • ENTRY_TYPES: Eine Liste der Eintragstypen, die für den Import infrage kommen, im Format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, in den Sie Metadaten importieren, oder global.
    • ASPECT_TYPES: Eine Liste der Aspekttypen, die für den Import infrage kommen, im Format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, in den Sie Metadaten importieren, oder global.
    • Optional: Geben Sie für das Argument NETWORK_TAGS eine Liste von Netzwerk-Tags an.
    • Optional: Geben Sie für das Argument NETWORK_URI den URI des VPC-Netzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Netzwerk angeben, lassen Sie das Subnetzargument weg.
    • Optional: Geben Sie für das Argument SUBNETWORK_URI den URI des Subnetzwerks an, das mit der Datenquelle verbunden ist. Wenn Sie ein Subnetz angeben, lassen Sie das Netzwerkargument weg.
  3. Initialisieren Sie Terraform:

    terraform init
    
  4. Validieren Sie Terraform mit Ihrer .tfvars-Datei:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Ersetzen Sie CONNECTOR_VARIABLES_FILE durch den Namen Ihrer Variablendefinitionsdatei.

  5. Stellen Sie Terraform mit Ihrer Datei .tfvars bereit:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform erstellt einen Workflow und einen Cloud Scheduler-Job im angegebenen Projekt. Workflows führt die Pipeline nach dem von Ihnen angegebenen Zeitplan aus.

    Je nachdem, wie viele Metadaten Sie importieren, kann es einige Minuten oder länger dauern, bis der Workflow ausgeführt wird. Weitere Informationen dazu, wie Sie den Fortschritt ansehen können, finden Sie unter Ergebnisse der Workflow-Ausführung aufrufen.

    Nachdem die Pipeline ausgeführt wurde, können Sie in Dataplex Universal Catalog nach den importierten Metadaten suchen.

Joblogs ansehen

Verwenden Sie Cloud Logging, um Logs für eine verwaltete Konnektivitätspipeline aufzurufen. Die Log-Nutzlast enthält einen Link zu den Logs für den Dataproc Serverless-Batchjob und den Metadatenimportjob, sofern zutreffend. Weitere Informationen finden Sie unter Workflow-Logs ansehen.

Fehlerbehebung

Versuchen Sie es mit diesen Vorschlägen zur Fehlerbehebung:

  • Konfigurieren Sie die Protokollebene des Importjobs für den Metadatenjob so, dass Protokollierung auf Debug-Ebene anstelle von Protokollierung auf Info-Ebene verwendet wird.
  • Sehen Sie sich die Logs für den serverlosen Dataproc-Batchjob (für Connector-Ausführungen) und den Metadatenimportjob an. Weitere Informationen finden Sie unter Dataproc Serverless für Spark-Logs abfragen und Metadaten-Joblogs abfragen.
  • Wenn ein Eintrag nicht über die Pipeline importiert werden kann und die Fehlermeldung nicht genügend Informationen enthält, erstellen Sie einen benutzerdefinierten Eintrag mit denselben Details in einer Testeintragsgruppe. Weitere Informationen finden Sie unter Benutzerdefinierten Eintrag erstellen.

Nächste Schritte