Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird beschrieben, wie Sie mit Cloud Composer 2 serverlose Dataproc-Nutzlasten inGoogle Cloudausführen.
In den Beispielen in den folgenden Abschnitten wird gezeigt, wie Sie Operatoren zum Verwalten von serverlosen Dataproc-Batcharbeitslasten verwenden. Sie verwenden diese Operatoren in DAGs, mit denen eine serverlose Dataproc Spark-Batch-Arbeitslast erstellt, gelöscht, aufgelistet und abgerufen wird:
Erstellen Sie DAGs für Operatoren, die mit serverlosen Dataproc-Batch-Arbeitslasten funktionieren:
DAGs erstellen, die benutzerdefinierte Container und Dataproc Metastore verwenden.
Konfigurieren Sie den Persistent History Server für diese DAGs.
Hinweise
Aktivieren Sie die Dataproc API:
Console
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Wählen Sie den Speicherort für die Datei Ihrer Batcharbeitslast aus. Sie haben dazu folgende Möglichkeiten:
- Erstellen Sie einen Cloud Storage-Bucket, in dem diese Datei gespeichert wird.
- Verwenden Sie den Bucket Ihrer Umgebung. Da Sie diese Datei nicht mit Airflow synchronisieren müssen, können Sie einen separaten Unterordner außerhalb der Ordner
/dags
oder/data
erstellen. Beispiel:/batches
- Vorhandenen Bucket verwenden
Dateien und Airflow-Variablen einrichten
In diesem Abschnitt wird gezeigt, wie Sie Dateien einrichten und Airflow-Variablen für diese Anleitung konfigurieren.
Datei für eine serverlose Dataproc Spark ML-Arbeitslast in einen Bucket hochladen
Die Arbeitslast in dieser Anleitung führt ein PySpark-Skript aus:
Speichern Sie ein beliebiges PySpark-Script in einer lokalen Datei mit dem Namen
spark-job.py
. Sie können beispielsweise das Pyspark-Beispielskript verwenden.Laden Sie die Datei an den Speicherort hoch, den Sie unter Vorbereitung ausgewählt haben.
Airflow-Variablen festlegen
In den Beispielen in den folgenden Abschnitten werden Airflow-Variablen verwendet. Sie legen Werte für diese Variablen in Airflow fest. Ihr DAG-Code kann dann auf diese Werte zugreifen.
In den Beispielen in dieser Anleitung werden die folgenden Airflow-Variablen verwendet. Sie können sie je nach verwendetem Beispiel nach Bedarf festlegen.
Legen Sie die folgenden Airflow-Variablen für die Verwendung in Ihrem DAG-Code fest:
project_id
: Projekt-ID.bucket_name
: URI eines Buckets, in dem sich die Python-Hauptdatei der Arbeitslast (spark-job.py
) befindet. Sie haben diesen Speicherort unter Vorbereitung ausgewählt.phs_cluster
: Name des Clusters für den Persistent History Server. Sie legen diese Variable fest, wenn Sie einen Persistent History Server erstellen.image_name
: Name und Tag des benutzerdefinierten Container-Images (image:tag
). Sie legen diese Variable fest, wenn Sie ein benutzerdefiniertes Container-Image mit DataprocCreateBatchOperator verwenden.metastore_cluster
: Dataproc Metastore-Dienstname. Sie legen diese Variable fest, wenn Sie den Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden.region_name
: Region, in der sich der Dataproc Metastore-Dienst befindet. Sie legen diese Variable fest, wenn Sie den Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden.
Jede Airflow-Variable über die Google Cloud Konsole und die Airflow-Benutzeroberfläche festlegen
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Link Airflow für Ihre Umgebung. Die Airflow-UI wird geöffnet.
Wählen Sie in der Airflow-UI Admin > Variablen aus.
Klicken Sie auf Neuen Eintrag hinzufügen.
Geben Sie den Namen der Variable im Feld Schlüssel an und legen Sie den Wert dafür im Feld Wert fest.
Klicken Sie auf Speichern.
Persistent History Server erstellen
Verwenden Sie einen Persistent History Server (PHS), um Spark-Verlaufsdateien Ihrer Batcharbeitslasten aufzurufen:
- Persistent History Server erstellen
- Achten Sie darauf, dass Sie den Namen des PHS-Clusters in der
phs_cluster
Airflow-Variable angegeben haben.
DataprocCreateBatchOperator
Mit dem folgenden DAG wird eine serverlose Dataproc-Batch-Arbeitslast gestartet.
Weitere Informationen zu DataprocCreateBatchOperator
-Argumenten finden Sie im Quellcode des Operators.
Weitere Informationen zu Attributen, die Sie im Parameter batch
von DataprocCreateBatchOperator
übergeben können, finden Sie in der Beschreibung der Batch-Klasse.
Benutzerdefiniertes Container-Image mit DataprocCreateBatchOperator verwenden
Das folgende Beispiel zeigt, wie Sie ein benutzerdefiniertes Container-Image zum Ausführen Ihrer Arbeitslasten verwenden. Sie können beispielsweise einen benutzerdefinierten Container verwenden, um Python-Abhängigkeiten hinzuzufügen, die nicht vom Standard-Container-Image bereitgestellt werden.
So verwenden Sie ein benutzerdefiniertes Container-Image:
Benutzerdefiniertes Container-Image erstellen und in Container Registry hochladen
Geben Sie das Image in der
image_name
Airflow-Variable an.Verwenden Sie DataprocCreateBatchOperator mit Ihrem benutzerdefinierten Image:
Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden
So verwenden Sie einen Dataproc Metastore-Dienst in einem DAG:
Prüfen Sie, ob Ihr Metastore-Dienst bereits gestartet wurde.
Informationen zum Starten eines Metastore-Dienstes finden Sie unter Dataproc Metastore aktivieren und deaktivieren.
Ausführliche Informationen zum Batch-Operator zum Erstellen der Konfiguration finden Sie unter PeripheralsConfig.
Sobald der Metastore-Dienst ausgeführt wird, geben Sie seinen Namen in der
metastore_cluster
-Variablen und seine Region in derregion_name
-Airflow-Variablen an.Metastore-Dienst in DataprocCreateBatchOperator verwenden:
DataprocDeleteBatchOperator
Mit DataprocDeleteBatchOperator können Sie einen Batch anhand der Batch-ID der Arbeitslast löschen.
DataprocListBatchesOperator
DataprocDeleteBatchOperator listet Batches auf, die in einer bestimmten project_id und Region vorhanden sind.
DataprocGetBatchOperator
Mit DataprocGetBatchOperator wird eine bestimmte Batcharbeitslast abgerufen.