Flusso di lavoro con Cloud Composer

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Prima di iniziare

Configura il progetto

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Crea un modello di flusso di lavoro Dataproc

Copia ed esegui i comandi elencati di seguito in una finestra del terminale locale o in Cloud Shell per creare e definire un modello di flusso di lavoro.

  1. Crea il modello di flusso di lavoro sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Aggiungi il job Spark al modello di workflow sparkpi. Il flag step-id "compute" identifica il job SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Utilizza un cluster gestiti, a un solo nodo per eseguire il flusso di lavoro. Dataproc creerà il cluster, lo eseguirà e lo eliminerà al termine del flusso di lavoro.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Conferma la creazione del modello di flusso di lavoro.

    Console

    Fai clic sul nome sparkpi nella pagina Dataproc Flussi di lavoro nella console Google Cloud per aprire la pagina Dettagli del modello di flusso di lavoro. Fai clic sul nome del modello di flusso di lavoro per confermare gli attributi del modello sparkpi.

    Comando g-cloud

    Esegui questo comando:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Creare e caricare un DAG in Cloud Storage

  1. Crea o utilizza un ambiente Cloud Composer esistente.
  2. Imposta le variabili di ambiente.

    UI di Airflow

    1. Nella barra degli strumenti, fai clic su Amministrazione > Variabili.
    2. Fai clic su Crea.
    3. Inserisci le seguenti informazioni:
      • Chiave:project_id
      • Val: PROJECT_ID, il tuo ID progetto Google Cloud
    4. Fai clic su Salva.

    Comando g-cloud

    Inserisci i seguenti comandi:

    • ENVIRONMENT è il nome dell'ambiente Cloud Composer
    • LOCATION è la regione in cui si trova l'ambiente Cloud Composer
    • PROJECT_ID è l'ID del progetto che contiene l'ambiente Cloud Composer
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Copia il seguente codice DAG localmente in un file denominato "composer-dataproc-dag.py", che utilizza DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Carica il DAG nella cartella dell'ambiente in Cloud Storage. Al termine del caricamento, fai clic sul link Cartella DAG nella pagina dell'ambiente Cloud Composer.

Visualizzare lo stato di un'attività

UI di Airflow

  1. Apri l'interfaccia web di Airflow.
  2. Nella pagina DAG, fai clic sul nome del DAG (ad esempio dataproc_workflow_dag).
  3. Nella pagina Dettagli DAG, fai clic su Visualizzazione grafico.
  4. Controlla lo stato:
    • Non riuscita: l'attività è racchiusa in un riquadro rosso. Puoi anche tenere premuto il cursore sopra l'attività e cercare Stato: non riuscito. L'attività è racchiusa in un riquadro rosso, a indicare che non è riuscita
    • Risultato positivo: l'attività è racchiusa in una casella verde. Puoi anche tenere premuto il cursore sopra l'attività e controllare se è presente lo stato Stato: completata. L'attività è racchiusa in una casella verde, che indica che è riuscita

Console

Fai clic sulla scheda Workflows per visualizzare lo stato del flusso di lavoro.

Comando g-cloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi, puoi eliminare le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Cloud Composer.

  2. Elimina il modello di flusso di lavoro.

Passaggi successivi