Daten mit Workflows aus Cloud Storage in BigQuery laden

Last reviewed 2021-05-12 UTC

In dieser Anleitung wird gezeigt, wie Sie serverlose Workflows mithilfe von Workflows, Cloud Run-Funktionen und Firestore zum Laden von Rohdaten, z. B. Ereignisprotokollen, aus Cloud Storage in BigQuery ausführen. Analyseplattformen haben in der Regel ein Orchestrierungstool, um periodisch Daten mit BigQuery-Jobs in BigQuery zu laden und diese dann anzupassen. So werden Geschäftsmesswerte mit SQL-Anweisungen bereitgestellt, darunter prozedurale Sprachanweisungen in BigQuery. Diese Anleitung richtet sich an Entwickler und Architekten, die serverlose, ereignisgesteuerte Pipelines zur Datenverarbeitung erstellen möchten. Es wird davon ausgegangen, dass Sie mit YAML, SQL und Python vertraut sind.

Architektur

Das folgende Diagramm zeigt die allgemeine Architektur einer serverlosen ELT-Pipeline (Extrahieren, Laden und Transformieren), die Workflows verwendet.

Pipeline extrahieren, laden und transformieren.

Betrachten Sie im obigen Diagramm eine Einzelhandelsplattform, die regelmäßig Verkaufsereignisse als Dateien aus verschiedenen Geschäften erfasst und die Dateien anschließend in einen Cloud Storage-Bucket schreibt. Die Ereignisse werden verwendet, um Geschäftsmesswerte durch Importieren und Verarbeiten in BigQuery bereitzustellen. Diese Architektur bietet ein zuverlässiges und serverloses Orchestrierungssystem zum Importieren Ihrer Dateien in BigQuery und ist in die beiden folgenden Module unterteilt:

  • Dateiliste: Verwaltet die Liste der nicht verarbeiteten Dateien, die einem Cloud Storage-Bucket in einer Firestore-Sammlung hinzugefügt wurden. Dieses Modul arbeitet mit einer Cloud Run-Funktion, die durch das Speicherereignis Objekt finalisieren ausgelöst wird. Dieses wird generiert, wenn dem Cloud Storage-Bucket eine neue Datei hinzugefügt wird. Der Dateiname wird an das Array files der Sammlung mit dem Namen new in Firestore angehängt.
  • Workflow: Führt die geplanten Workflows werden aus. Cloud Scheduler löst einen Workflow aus, der eine Reihe von Schritten gemäß einer YAML-basierten Syntax ausführt, um das Laden zu orchestrieren und dann die Daten durch Aufrufen von Cloud Run-Funktionen in BigQuery zu transformieren. Die Schritte im Workflow rufen Cloud Run-Funktionen auf, um die folgenden Aufgaben auszuführen:

    • Erstellen und starten Sie einen BigQuery-Ladejob.
    • Fragen Sie den Ladejobstatus ab.
    • Erstellen und starten Sie den Transformationsabfragejob.
    • Fragen Sie den Status des Transformationsjobs ab.

Durch die Verwendung der Transaktionen zum Verwalten der Liste der neuen Dateien in Firestore wird sichergestellt, dass keine Datei übersehen wird, wenn sie von einem Workflow in BigQuery importiert werden. Separate Ausführungen des Workflows sind idempotent, indem Job-Metadaten und der Status in Firestore gespeichert werden.

Ziele

  • Erstellen Sie eine Firestore-Datenbank.
  • Richten Sie einen Cloud Run-Funktionstrigger ein, um Dateien zu verfolgen, die dem Cloud Storage-Bucket in Firestore hinzugefügt wurden.
  • Cloud Run-Funktionen zum Ausführen und Überwachen von BigQuery-Jobs bereitstellen.
  • Einen Workflow bereitstellen und ausführen, um den Prozess zu automatisieren.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. Rufen Sie die Seite Willkommen auf und notieren Sie sich die Projekt-ID, die Sie in einem späteren Schritt benötigen.

    Zur Begrüßungsseite

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

Umgebung vorbereiten

Um Ihre Umgebung vorzubereiten, erstellen Sie eine Firestore-Datenbank, klonen Sie die Codebeispiele aus dem GitHub-Repository, erstellen Sie Ressourcen mit Terraform, bearbeiten Sie die YAML-Datei für Workflows und installieren Sie die Anforderungen für den Dateigenerator.

  1. So erstellen Sie eine Firestore-Datenbank:

    1. Rufen Sie in der Google Cloud Console die Seite Firestore auf.

      Firestore aufrufen

    2. Klicken Sie auf Nativen Modus auswählen.

    3. Wählen Sie im Menü Standort auswählen die Region aus, in der Sie die Firestore-Datenbank hosten möchten. Wir empfehlen, eine Region in der Nähe Ihres physischen Standorts auszuwählen.

    4. Klicken Sie auf Datenbank erstellen.

  2. Klonen Sie in Cloud Shell das Quell-Repository:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. Erstellen Sie in Cloud Shell mit Terraform die folgenden Ressourcen:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Google Cloud-Projekt-ID
    • REGION: Ein bestimmter geografischer Standort von Google Cloud zum Hosten Ihrer Ressourcen, z. B. us-central1
    • ZONE: Ein Standort innerhalb einer Region zum Hosten Ihrer Ressourcen, z. B. us-central1-b

    Es sollte eine Meldung ähnlich der folgenden angezeigt werden: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Mit Terraform können Sie Infrastruktur in großem Maßstab sicher und vorhersehbar erstellen, ändern und aktualisieren. Die folgenden Ressourcen werden in Ihrem Projekt erstellt:

    • Dienstkonten mit den erforderlichen Berechtigungen, um einen sicheren Zugriff auf Ihre Ressourcen zu gewährleisten.
    • Ein BigQuery-Dataset mit dem Namen serverless_elt_dataset und eine Tabelle namens word_count, um die eingehenden Dateien zu laden
    • Ein Cloud Storage-Bucket mit dem Namen ${project_id}-ordersbucket für das Staging von Eingabedateien
    • Die folgenden fünf Cloud Run-Funktionen:
      • file_add_handler fügt den Namen der Dateien hinzu, die dem Cloud Storage-Bucket in der Firestore-Sammlung hinzugefügt wurden.
      • create_job erstellt einen neuen BigQuery-Ladejob und verknüpft Dateien in der Firebase-Sammlung mit dem Job.
      • create_query erstellt einen neuen BigQuery-Abfragejob.
      • poll_bigquery_job ruft den Status eines BigQuery-Jobs ab.
      • run_bigquery_job startet einen BigQuery-Job.
  4. Rufen Sie die URLs für die Cloud Run-Funktionen create_job, create_query, poll_job und run_bigquery_job ab, die Sie im vorherigen Schritt bereitgestellt haben.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    Die Ausgabe sieht in etwa so aus:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    Notieren Sie sich diese URLs, da sie beim Bereitstellen des Workflows benötigt werden.

Workflow erstellen und bereitstellen

  1. Öffnen Sie in Cloud Shell die Quelldatei für den Workflow,workflow.yaml:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    Ersetzen Sie Folgendes:

    • CREATE_JOB_URL: die URL der Funktion zum Erstellen eines neuen Jobs
    • POLL_BIGQUERY_JOB_URL: die URL der Funktion, die den Status eines laufenden Jobs abfragt
    • RUN_BIGQUERY_JOB_URL: die URL der Funktion zum Starten eines BigQuery-Ladejobs
    • CREATE_QUERY_URL: die URL der Funktion zum Starten eines BigQuery-Abfragejobs
    • BQ_REGION: die BigQuery-Region, in der Daten gespeichert sind, z. B. US
    • BQ_DATASET_TABLE_NAME: Der Name der BigQuery-Dataset-Tabelle im Format PROJECT_ID.serverless_elt_dataset.word_count
  2. Stellen Sie die Datei workflow bereit:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    Dabei gilt:

    • WORKFLOW_NAME: der eindeutige Name des Workflows
    • WORKFLOW_REGION: die Region, in der der Workflow bereitgestellt wird, z. B. us-central1
    • WORKFLOW_DESCRIPTION: die Beschreibung des Workflows
  3. Erstellen Sie eine virtuelle Python 3-Umgebung und installieren Sie die Installationsanforderungen für den Dateigenerator:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Zu importierende Dateien generieren

Das Python-Script gen.py generiert zufällige Inhalte im Avro-Format. Das Schema ist mit der BigQuery-Tabelle word_count identisch. Diese Avro-Dateien werden in den angegebenen Cloud Storage-Bucket kopiert.

Generieren Sie die Dateien in Cloud Shell:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

Dabei gilt:

  • RECORDS_PER_FILE: die Anzahl der Datensätze in einer einzelnen Datei
  • NUM_FILES: die Gesamtanzahl der hochzuladenden Dateien
  • FILE_PREFIX: das Präfix für die Namen der generierten Dateien

Dateieinträge in Firestore ansehen

Wenn die Dateien in Cloud Storage kopiert werden, wird die Cloud Run-Funktion handle_new_file ausgelöst. Diese Funktion fügt die Dateiliste dem Dateilistenarray im Dokument new in der Firestore-Sammlung jobs hinzu.

Rufen Sie in der Google Cloud Console die Seite Firestore-Daten auf, um sich die Dateiliste anzeigen zu lassen.

Zu den Daten

Liste der Dateien, die der Sammlung hinzugefügt wurden.

Workflow auslösen

Mit Workflows wird eine Reihe von serverlosen Aufgaben aus Google Cloud- und API-Diensten verknüpft. Einzelne Schritte in diesem Workflow werden als Cloud Run-Funktionen ausgeführt und der Status wird in Firestore gespeichert. Alle Aufrufe von Cloud Run-Funktionen werden mithilfe des Dienstkontos des Workflows authentifiziert.

Führen Sie den Workflow in Cloud Shell aus:

gcloud workflows execute WORKFLOW_NAME

Das folgende Diagramm zeigt die im Workflow verwendeten Schritte:

Schritte, die im Haupt- und untergeordneten Workflow verwendet werden.

Der Workflow ist in zwei Teile unterteilt: den Hauptworkflow und den untergeordneten Workflow. Der Hauptworkflow übernimmt die Joberstellung und die bedingte Ausführung, während der untergeordnete Workflow einen BigQuery-Job ausführt. Der Workflow führt folgende Vorgänge aus:

  • Die Cloud Run-Funktion create_job erstellt ein neues Jobobjekt, ruft die Liste der zu Cloud Storage hinzugefügten Dateien aus dem Firestore-Dokument ab und verknüpft die Dateien mit dem Ladejob. Wenn keine Dateien zum Laden vorhanden sind, erstellt die Funktion keinen neuen Job.
  • Die Cloud Run-Funktion create_query verwendet die auszuführende Abfrage zusammen mit der BigQuery-Region, in der die Abfrage ausgeführt werden soll. Die Funktion erstellt den Job in Firestore und gibt die Job-ID zurück.
  • Die Cloud Run-Funktion run_bigquery_job ruft die ID des Jobs ab, der ausgeführt werden muss, und ruft dann die BigQuery API auf, um den Job zu senden.
  • Statt auf den Abschluss des Jobs in der Cloud Run-Funktion zu warten, können Sie den Status des Jobs regelmäßig abfragen.
    • Die Cloud Run-Funktion poll_bigquery_job gibt den Status des Jobs an. Sie wird so lange wiederholt aufgerufen, bis der Job abgeschlossen ist.
    • Um Verzögerungen zwischen Aufrufen der Cloud Run-Funktion poll_bigquery_job hinzuzufügen, wird über Workflows eine sleep-Routine aufgerufen.

Jobstatus ansehen

Sie können sich die Dateiliste und den Status des Jobs ansehen.

  1. Rufen Sie in der Google Cloud Console die Seite Firestore-Daten auf.

    Zu den Daten

  2. Für jeden Job wird eine eindeutige Kennzeichnung (UUID) generiert. Klicken Sie auf die Job-ID, um job_type und status aufzurufen. Jeder Job kann einen der folgenden Typen und Status haben:

    • job_type: Der Typ des Jobs, der vom Workflow ausgeführt wird, mit einem der folgenden Werte:

      • 0: Daten in BigQuery laden
      • 1: Abfrage in BigQuery ausführen
    • status: der aktuelle Status des Jobs mit einem der folgenden Werte:

      • 0: Der Job wurde erstellt, aber nicht gestartet.
      • 1: Der Job wird ausgeführt.
      • 2: Der Job wurde erfolgreich ausgeführt.
      • 3: Es ist ein Fehler aufgetreten und der Job wurde nicht erfolgreich abgeschlossen.

    Das Jobobjekt enthält auch Metadatenattribute wie die Region des BigQuery-Datasets, den Namen der BigQuery-Tabelle und bei einem Abfragejob den ausgeführten Abfragestring.

Liste der Dateien mit hervorgehobenem Jobstatus.

Daten in BigQuery ansehen

Wenn Sie wissen möchten, ob der ELT-Job erfolgreich ausgeführt wurde, überprüfen Sie, ob die Daten in der Tabelle angezeigt werden.

  1. Rufen Sie in der Google Cloud Console die BigQuery-Seite Editor auf.

    Zum Editor

  2. Klicken Sie auf die Tabelle serverless_elt_dataset.word_count.

  3. Klicken Sie auf den Tab Preview (Vorschau).

    Tab "Vorschau" mit Daten in der Tabelle.

Workflow planen

Zur regelmäßigen Ausführung des Workflows nach Zeitplan können Sie Cloud Scheduler verwenden.

Bereinigen

Am einfachsten können Sie weitere Kosten vermeiden, wenn Sie das Google Cloud-Projekt löschen, das Sie für die Anleitung erstellt haben. Alternativ haben Sie die Möglichkeit, die einzelnen Ressourcen zu löschen.

Einzelne Ressourcen löschen

  1. Entfernen Sie in Cloud Shell alle mit Terraform erstellten Ressourcen:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. Rufen Sie in der Google Cloud Console die Seite Firestore-Daten auf.

    Zu den Daten

  3. Klicken Sie neben Jobs auf  Menü und wählen Sie Löschen aus.

    Menüpfad zum Löschen einer Sammlung.

Projekt löschen

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Nächste Schritte