DAGs aus GitHub testen, synchronisieren und bereitstellen

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

In diesem Leitfaden wird beschrieben, wie Sie eine CI/CD-Pipeline erstellen, um DAGs aus Ihrem GitHub-Repository in Ihrer Cloud Composer-Umgebung zu testen, zu synchronisieren und bereitzustellen.

Wenn Sie nur Daten aus anderen Diensten synchronisieren möchten, lesen Sie den Hilfeartikel Daten aus anderen Diensten übertragen.

CI/CD-Pipeline – Übersicht

Architekturdiagramm mit den Schritten des Ablaufs. Die Vorabprüfung und die PR-Überprüfung befinden sich im GitHub-Abschnitt, die DAG-Synchronisierung und die manuelle DAG-Überprüfung im Abschnitt „ Google Cloud “.
Abbildung 1. Architekturdiagramm mit den Schritten des Ablaufs (zum Vergrößern klicken)

Die CI/CD-Pipeline zum Testen, Synchronisieren und Bereitstellen von DAGs umfasst die folgenden Schritte:

  1. Sie nehmen eine Änderung an einem DAG vor und übertragen diese Änderung per Push in einen Entwicklungszweig in Ihrem Repository.

  2. Sie öffnen eine Pull-Anfrage für den Hauptzweig Ihres Repositorys.

  3. Cloud Build führt Einheitentests aus, um zu prüfen, ob Ihr DAG gültig ist.

  4. Ihre Pull-Anfrage wurde genehmigt und in den Hauptzweig Ihres Repositorys zusammengeführt.

  5. Cloud Build synchronisiert Ihre Cloud Composer-Entwicklungsumgebung mit diesen neuen Änderungen.

  6. Sie prüfen, ob sich die DAG in Ihrer Entwicklungsumgebung wie erwartet verhält.

  7. Wenn Ihr DAG wie erwartet funktioniert, laden Sie ihn in Ihre Cloud Composer-Produktionsumgebung hoch.

Ziele

Hinweise

  • In dieser Anleitung wird davon ausgegangen, dass Sie mit zwei identischen Cloud Composer-Umgebungen arbeiten: einer Entwicklungsumgebung und einer Produktionsumgebung.

    In diesem Leitfaden konfigurieren Sie eine CI/CD-Pipeline nur für Ihre Entwicklungsumgebung. Achten Sie darauf, dass die verwendete Umgebung keine Produktionsumgebung ist.

  • In dieser Anleitung wird davon ausgegangen, dass Sie Ihre DAGs und die zugehörigen Tests in einem GitHub-Repository gespeichert haben.

    Die Beispiel-CI/CD-Pipeline veranschaulicht den Inhalt eines Beispiel-Repositorys. DAGs und Tests werden im Verzeichnis dags/ gespeichert. An der obersten Ebene befinden sich die Anforderungsdateien, die Datei mit den Einschränkungen und die Cloud Build-Konfigurationsdateien. Das DAG-Synchronisierungstool und seine Anforderungen befinden sich im Verzeichnis utils.

Vorabtestjob und Unit-Tests erstellen

Im ersten Cloud Build-Job wird eine Vorabprüfung ausgeführt, bei der Komponententests für Ihre DAGs ausgeführt werden.

Unittests hinzufügen

Wenn Sie es noch nicht getan haben, erstellen Sie Einheitentests für Ihre DAGs. Speichern Sie diese Tests zusammen mit den DAGs in Ihrem Repository, jeweils mit dem Suffix _test. Die Testdatei für den DAG in example_dag.py ist beispielsweise example_dag_test.py. Dies sind die Tests, die als Presubmit-Prüfung in Ihrem Repository ausgeführt werden.

Cloud Build-YAML-Konfiguration für den Vorabtest erstellen

Erstellen Sie in Ihrem Repository eine YAML-Datei mit dem Namen test-dags.cloudbuild.yaml, die Ihren Cloud Build-Job für Presubmit-Prüfungen konfiguriert. Dazu sind drei Schritte erforderlich:

  1. Installieren Sie die Abhängigkeiten, die für Ihre DAGs erforderlich sind.
  2. Installieren Sie die Abhängigkeiten, die für Ihre Unittests erforderlich sind.
  3. Führen Sie die DAG-Tests aus.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Cloud Build-Trigger für den Vorabtest erstellen

Folgen Sie der Anleitung unter Repositories aus GitHub erstellen, um einen GitHub-App-basierten Trigger mit den folgenden Konfigurationen zu erstellen:

  • Name: test-dags

  • Ereignis: Pull-Anfrage

  • Quelle – Repository: Wählen Sie Ihr Repository aus.

  • Quelle – Basis-Branch: ^main$ (ändern Sie main bei Bedarf in den Namen des Basis-Branchs Ihres Repositorys)

  • Quelle – Kommentarsteuerung: nicht erforderlich

  • Build-Konfiguration – Cloud Build-Konfigurationsdatei: /test-dags.cloudbuild.yaml (der Pfad zu Ihrer Build-Datei)

DAG-Synchronisierungsjob erstellen und DAGs-Utility-Script hinzufügen

Konfigurieren Sie als Nächstes einen Cloud Build-Job, der ein DAGs-Hilfsskript ausführt. Das Utility-Script in diesem Job synchronisiert Ihre DAGs mit Ihrer Cloud Composer-Umgebung, nachdem sie in Ihrem Repository in den Hauptzweig zusammengeführt wurden.

DAGs-Hilfsskript hinzufügen

Fügen Sie Ihrem Repository das DAG-Utility-Skript hinzu. Mit diesem Hilfsskript werden alle DAG-Dateien im Verzeichnis dags/ Ihres Repositorys in ein temporäres Verzeichnis kopiert. Dabei werden alle Python-Dateien, die keine DAG-Dateien sind, ignoriert. Das Skript verwendet dann die Cloud Storage-Clientbibliothek, um alle Dateien aus diesem temporären Verzeichnis in das Verzeichnis dags/ im Bucket Ihrer Cloud Composer-Umgebung hochzuladen.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Cloud Build-YAML-Konfiguration zum Synchronisieren von DAGs erstellen

Erstellen Sie in Ihrem Repository eine YAML-Datei mit dem Namen add-dags-to-composer.cloudbuild.yaml, in der Sie Ihren Cloud Build-Job zum Synchronisieren von DAGs konfigurieren. Er besteht aus zwei Schritten:

  1. Installieren Sie die Abhängigkeiten, die für das DAGs-Hilfsprogramm-Skript erforderlich sind.

  2. Führen Sie das Dienstprogramm-Skript aus, um die DAGs in Ihrem Repository mit Ihrer Cloud Composer-Umgebung zu synchronisieren.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Cloud Build-Trigger erstellen

Folgen Sie der Anleitung unter Repositories aus GitHub erstellen, um einen GitHub-App-basierten Trigger mit den folgenden Konfigurationen zu erstellen:

  • Name: add-dags-to-composer

  • Ereignis: Push zu Zweig

  • Quelle – Repository: Wählen Sie Ihr Repository aus.

  • Quelle – Basis-Branch: ^main$ (ändern Sie main bei Bedarf in den Namen des Basis-Branchs Ihres Repositorys)

  • Quelle – Filter für enthaltene Dateien (glob): dags/**

  • Build-Konfiguration – Cloud Build-Konfigurationsdatei: /add-dags-to-composer.cloudbuild.yaml (der Pfad zu Ihrer Build-Datei)

Fügen Sie in der erweiterten Konfiguration zwei Substitutionsvariablen hinzu:

  • _DAGS_DIRECTORY: Das Verzeichnis, in dem sich DAGs in Ihrem Repository befinden. Wenn Sie das Beispiel-Repository aus dieser Anleitung verwenden, ist es dags/.

  • _DAGS_BUCKET: Der Cloud Storage-Bucket, der das Verzeichnis dags/ in Ihrer Cloud Composer-Entwicklungsumgebung enthält. Lassen Sie das Präfix gs:// weg. Beispiel: us-central1-example-env-1234ab56-bucket.

CI/CD-Pipeline testen

In diesem Abschnitt folgen Sie einem DAG-Entwicklungsablauf, bei dem die neu erstellten Cloud Build-Trigger verwendet werden.

Presubmit-Job ausführen

Erstellen Sie eine Pull-Anfrage für Ihren Hauptzweig, um den Build zu testen. Suchen Sie auf der Seite nach dem Presubmit-Check. Klicken Sie auf Details und wählen Sie Weitere Details in Google Cloud Build anzeigen aus, um Ihre Build-Logs in derGoogle Cloud console anzusehen.

Screenshot einer GitHub-Prüfung namens „test-dags“ mit einem roten Pfeil, der auf den Projektnamen in Klammern zeigt
Abbildung 2. Screenshot des Status der Cloud Build-Vorabprüfung auf GitHub (zum Vergrößern klicken)

Wenn Ihre Presubmit-Prüfung fehlgeschlagen ist, lesen Sie den Abschnitt Build-Fehler beheben.

Prüfen, ob Ihr DAG in Ihrer Cloud Composer-Entwicklungsumgebung funktioniert

Nachdem Ihre Pull-Anfrage genehmigt wurde, führen Sie sie mit Ihrem Hauptzweig zusammen. Verwenden Sie dieGoogle Cloud Console, um Ihre Build-Ergebnisse anzusehen. Wenn Sie viele Cloud Build-Trigger haben, können Sie Ihre Builds nach dem Triggernamen add-dags-to-composer filtern.

Nachdem der Cloud Build-Synchronisierungsjob erfolgreich abgeschlossen wurde, wird der synchronisierte DAG in Ihrer Cloud Composer-Entwicklungsumgebung angezeigt. Dort können Sie prüfen, ob der DAG wie erwartet funktioniert.

DAG in die Produktionsumgebung einfügen

Wenn der DAG wie erwartet funktioniert, fügen Sie ihn manuell Ihrer Produktionsumgebung hinzu. Laden Sie dazu die DAG-Datei in das Verzeichnis dags/ im Bucket Ihrer Cloud Composer-Produktionsumgebung hoch.

Wenn Ihr DAG-Synchronisierungsjob fehlgeschlagen ist oder sich Ihr DAG in Ihrer Cloud Composer-Entwicklungsumgebung nicht wie erwartet verhält, lesen Sie den Abschnitt Build-Fehler beheben.

Build-Fehler beheben

In diesem Abschnitt wird erläutert, wie Sie häufige Szenarien für Build-Fehler beheben.

Was passiert, wenn die Vorabprüfung fehlschlägt?

Klicken Sie in Ihrem Pull-Request auf Details und wählen Sie Weitere Details in Google Cloud Build anzeigen aus, um Ihre Build-Logs in derGoogle Cloud -Konsole aufzurufen. Anhand dieser Logs können Sie das Problem mit Ihrem DAG beheben. Sobald Sie die Probleme behoben haben, übertragen Sie die Korrektur und pushen Sie sie in Ihren Branch. Die Vorabprüfung wird noch einmal ausgeführt und Sie können die Logs als Debugging-Tool verwenden, um weitere Änderungen vorzunehmen.

Was passiert, wenn mein DAG-Synchronisierungsjob fehlschlägt?

In der Google Cloud Console können Sie Ihre Build-Ergebnisse ansehen. Wenn Sie viele Cloud Build-Trigger haben, können Sie Ihre Builds nach dem Triggernamen add-dags-to-composer filtern. Sehen Sie sich die Logs des Build-Jobs an und beheben Sie die Fehler. Wenn Sie weitere Hilfe bei der Behebung der Fehler benötigen, nutzen Sie die Supportkanäle.

Was kann ich tun, wenn meine DAG in meiner Cloud Composer-Umgebung nicht richtig funktioniert?

Wenn Ihr DAG in Ihrer Cloud Composer-Entwicklungsumgebung nicht wie erwartet funktioniert, übertragen Sie ihn nicht manuell in Ihre Cloud Composer-Produktionsumgebung. Führen Sie stattdessen einen der folgenden Schritte aus:

  • Machen Sie den Pull-Request rückgängig, der die Änderungen enthält, die Ihre DAG beschädigt haben, um sie in den Zustand unmittelbar vor Ihren Änderungen zurückzusetzen. Dadurch werden auch alle anderen Dateien in diesem Pull-Request zurückgesetzt.
  • Erstellen Sie eine neue Pull-Anfrage, um die Änderungen am fehlerhaften DAG manuell rückgängig zu machen.
  • Erstellen Sie einen neuen Pull-Request, um die Fehler in Ihrem DAG zu beheben.

Wenn Sie einen dieser Schritte ausführen, wird ein neuer Presubmit-Check und beim Zusammenführen der DAG-Synchronisierungsjob ausgelöst.

Nächste Schritte