Dataproc Serverless-Arbeitslasten mit Cloud Composer ausführen

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite wird beschrieben, wie Sie mit Cloud Composer 2 serverlose Dataproc-Nutzlasten inGoogle Cloudausführen.

In den Beispielen in den folgenden Abschnitten wird gezeigt, wie Sie Operatoren zum Verwalten von serverlosen Dataproc-Batcharbeitslasten verwenden. Sie verwenden diese Operatoren in DAGs, mit denen eine serverlose Dataproc Spark-Batch-Arbeitslast erstellt, gelöscht, aufgelistet und abgerufen wird:

Hinweise

  1. Aktivieren Sie die Dataproc API:

    Console

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. Wählen Sie den Speicherort für die Datei Ihrer Batcharbeitslast aus. Sie haben dazu folgende Möglichkeiten:

    • Erstellen Sie einen Cloud Storage-Bucket, in dem diese Datei gespeichert wird.
    • Verwenden Sie den Bucket Ihrer Umgebung. Da Sie diese Datei nicht mit Airflow synchronisieren müssen, können Sie einen separaten Unterordner außerhalb der Ordner /dags oder /data erstellen. Beispiel: /batches
    • Vorhandenen Bucket verwenden

Dateien und Airflow-Variablen einrichten

In diesem Abschnitt wird gezeigt, wie Sie Dateien einrichten und Airflow-Variablen für diese Anleitung konfigurieren.

Datei für eine serverlose Dataproc Spark ML-Arbeitslast in einen Bucket hochladen

Die Arbeitslast in dieser Anleitung führt ein PySpark-Skript aus:

  1. Speichern Sie ein beliebiges PySpark-Script in einer lokalen Datei mit dem Namen spark-job.py. Sie können beispielsweise das Pyspark-Beispielskript verwenden.

  2. Laden Sie die Datei an den Speicherort hoch, den Sie unter Vorbereitung ausgewählt haben.

Airflow-Variablen festlegen

In den Beispielen in den folgenden Abschnitten werden Airflow-Variablen verwendet. Sie legen Werte für diese Variablen in Airflow fest. Ihr DAG-Code kann dann auf diese Werte zugreifen.

In den Beispielen in dieser Anleitung werden die folgenden Airflow-Variablen verwendet. Sie können sie je nach verwendetem Beispiel nach Bedarf festlegen.

Legen Sie die folgenden Airflow-Variablen für die Verwendung in Ihrem DAG-Code fest:

  • project_id: Projekt-ID.
  • bucket_name: URI eines Buckets, in dem sich die Python-Hauptdatei der Arbeitslast (spark-job.py) befindet. Sie haben diesen Speicherort unter Vorbereitung ausgewählt.
  • phs_cluster: Name des Clusters für den Persistent History Server. Sie legen diese Variable fest, wenn Sie einen Persistent History Server erstellen.
  • image_name: Name und Tag des benutzerdefinierten Container-Images (image:tag). Sie legen diese Variable fest, wenn Sie ein benutzerdefiniertes Container-Image mit DataprocCreateBatchOperator verwenden.
  • metastore_cluster: Dataproc Metastore-Dienstname. Sie legen diese Variable fest, wenn Sie den Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden.
  • region_name: Region, in der sich der Dataproc Metastore-Dienst befindet. Sie legen diese Variable fest, wenn Sie den Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden.

Jede Airflow-Variable über die Google Cloud Konsole und die Airflow-Benutzeroberfläche festlegen

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Link Airflow für Ihre Umgebung. Die Airflow-UI wird geöffnet.

  3. Wählen Sie in der Airflow-UI Admin > Variablen aus.

  4. Klicken Sie auf Neuen Eintrag hinzufügen.

  5. Geben Sie den Namen der Variable im Feld Schlüssel an und legen Sie den Wert dafür im Feld Wert fest.

  6. Klicken Sie auf Speichern.

Persistent History Server erstellen

Verwenden Sie einen Persistent History Server (PHS), um Spark-Verlaufsdateien Ihrer Batcharbeitslasten aufzurufen:

  1. Persistent History Server erstellen
  2. Achten Sie darauf, dass Sie den Namen des PHS-Clusters in der phs_cluster Airflow-Variable angegeben haben.

DataprocCreateBatchOperator

Mit dem folgenden DAG wird eine serverlose Dataproc-Batch-Arbeitslast gestartet.

Weitere Informationen zu DataprocCreateBatchOperator-Argumenten finden Sie im Quellcode des Operators.

Weitere Informationen zu Attributen, die Sie im Parameter batch von DataprocCreateBatchOperator übergeben können, finden Sie in der Beschreibung der Batch-Klasse.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

Benutzerdefiniertes Container-Image mit DataprocCreateBatchOperator verwenden

Das folgende Beispiel zeigt, wie Sie ein benutzerdefiniertes Container-Image zum Ausführen Ihrer Arbeitslasten verwenden. Sie können beispielsweise einen benutzerdefinierten Container verwenden, um Python-Abhängigkeiten hinzuzufügen, die nicht vom Standard-Container-Image bereitgestellt werden.

So verwenden Sie ein benutzerdefiniertes Container-Image:

  1. Benutzerdefiniertes Container-Image erstellen und in Container Registry hochladen

  2. Geben Sie das Image in der image_name Airflow-Variable an.

  3. Verwenden Sie DataprocCreateBatchOperator mit Ihrem benutzerdefinierten Image:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden

So verwenden Sie einen Dataproc Metastore-Dienst in einem DAG:

  1. Prüfen Sie, ob Ihr Metastore-Dienst bereits gestartet wurde.

    Informationen zum Starten eines Metastore-Dienstes finden Sie unter Dataproc Metastore aktivieren und deaktivieren.

    Ausführliche Informationen zum Batch-Operator zum Erstellen der Konfiguration finden Sie unter PeripheralsConfig.

  2. Sobald der Metastore-Dienst ausgeführt wird, geben Sie seinen Namen in der metastore_cluster-Variablen und seine Region in der region_name-Airflow-Variablen an.

  3. Metastore-Dienst in DataprocCreateBatchOperator verwenden:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

Mit DataprocDeleteBatchOperator können Sie einen Batch anhand der Batch-ID der Arbeitslast löschen.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator listet Batches auf, die in einer bestimmten project_id und Region vorhanden sind.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

Mit DataprocGetBatchOperator wird eine bestimmte Batcharbeitslast abgerufen.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

Nächste Schritte