Optionale Dataproc Flink-Komponente

Sie können zusätzliche Komponenten wie Flink aktivieren, wenn Sie einen Dataproc-Cluster mit der Funktion Optionale Komponenten erstellen. Auf dieser Seite erfahren Sie, wie Sie einen Dataproc-Cluster mit der optionalen Komponente Apache Flink erstellen (ein Flink-Cluster) und dann Flink-Jobs auf dem Cluster ausführen.

Sie können Ihren Flink-Cluster für Folgendes verwenden:

  1. Flink-Jobs mit der Dataproc-Jobs-Ressource über die Google Cloud Console, die Google Cloud CLI oder die Dataproc API ausführen

  2. Flink-Jobs mit der flink CLI ausführen, die auf dem Masterknoten des Flink-Clusters ausgeführt wird.

  3. Apache Beam-Jobs in Flink ausführen

  4. Flink auf einem kerberisierten Cluster ausführen

Sie können die Google Cloud Console, die Google Cloud CLI oder die Dataproc API verwenden, um einen Dataproc-Cluster zu erstellen, in dem die Flink-Komponente aktiviert ist.

Empfehlung:Verwenden Sie einen Standard-VM-Cluster mit einem Master und der Flink-Komponente. Dataproc-Cluster im Modus für hohe Verfügbarkeit (mit 3 Master-VMs) unterstützen den Flink-Modus für hohe Verfügbarkeit nicht.

  • Über den Link Flink History Server im Component Gateway können Sie den Flink-Verlaufsserver aufrufen, der auf dem Flink-Cluster ausgeführt wird.
  • Verwenden Sie das YARN ResourceManager link im Component Gateway, um die Flink Job Manager-Weboberfläche aufzurufen, die auf dem Flink-Cluster ausgeführt wird .
  • Erstellen Sie einen Dataproc Persistent History Server, um Flink-Jobverlaufsdateien aufzurufen, die von vorhandenen und gelöschten Flink-Clustern geschrieben wurden.

Sie können Flink-Jobs mit der Dataproc-Ressource Jobs über die Google Cloud Console, die Google Cloud CLI oder die Dataproc API ausführen.

Console

So reichen Sie einen Beispiel-Flink-Wordcount-Job über die Console ein:

  1. Öffnen Sie die Dataproc-Seite Job senden in der Google Cloud Console in Ihrem Browser.

  2. Füllen Sie die Felder auf der Seite Job senden aus:

    1. Wählen Sie den Namen des Clusters aus der Clusterliste aus.
    2. Legen Sie für Job type (Jobtyp) den Wert Flink fest.
    3. Legen Sie für Main class or jar (Hauptklasse oder JAR-Datei) den Wert org.apache.flink.examples.java.wordcount.WordCount fest.
    4. Legen Sie für Jar-Dateien den Wert file:///usr/lib/flink/examples/batch/WordCount.jar fest.
      • file:/// steht für eine Datei im Cluster. WordCount.jar wurde von Dataproc beim Erstellen des Flink-Clusters installiert.
      • In diesem Feld ist auch ein Cloud Storage-Pfad (gs://BUCKET/JARFILE) oder ein Hadoop Distributed File System-Pfad (HDFS, hdfs://PATH_TO_JAR) zulässig.
  3. Klicken Sie auf Senden.

    • Die Job-Treiberausgabe wird auf der Seite Jobdetails angezeigt.
    • Flink-Jobs werden in der Google Cloud Console auf der Seite Dataproc-Jobs aufgeführt.
    • Klicken Sie auf der Seite Jobs oder Jobdetails auf Beenden oder Löschen, um einen Job zu beenden oder zu löschen.

gcloud

Wenn Sie einen Flink-Job an einen Dataproc-Flink-Cluster senden möchten, führen Sie den Befehl gcloud dataproc jobs submit lokal in einem Terminalfenster oder in Cloud Shell aus.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Hinweise:

  • CLUSTER_NAME: Geben Sie den Namen des Dataproc-Flink-Clusters an, an den der Job gesendet werden soll.
  • REGION: Geben Sie eine Compute Engine-Region an, in der sich der Cluster befindet.
  • MAIN_CLASS: Geben Sie die main-Klasse Ihrer Flink-Anwendung an, z. B.:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: Geben Sie die JAR-Datei der Flink-Anwendung an. Sie können Folgendes angeben:
    • Eine JAR-Datei, die im Cluster mit dem Präfix file:///` installiert ist:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Eine JAR-Datei in Cloud Storage: gs://BUCKET/JARFILE
    • Eine JAR-Datei in HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: Optional können Sie nach dem doppelten Bindestrich (--) Jobargumente hinzufügen.

  • Nachdem Sie den Job gesendet haben, wird die Jobtreiberausgabe im lokalen oder Cloud Shell-Terminal angezeigt.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

In diesem Abschnitt wird gezeigt, wie Sie einen Flink-Job mithilfe der Dataproc API jobs.submit an einen Dataproc-Flink-Cluster senden.

Ersetzen Sie diese Werte in den folgenden Anfragedaten:

  • PROJECT_ID: ID des Google Cloud-Projekts.
  • REGION: Cluster-Region
  • CLUSTER_NAME: Geben Sie den Namen des Dataproc Flink-Clusters an, an den der Job gesendet werden soll.

HTTP-Methode und URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

JSON-Text anfordern:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

Sie sollten in etwa folgende JSON-Antwort erhalten:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Flink-Jobs werden in der Google Cloud Console auf der Seite Dataproc-Jobs aufgeführt.
  • Sie können in der Google Cloud Console auf der Seite Jobs oder Jobdetails auf Beenden oder Löschen klicken, um einen Job zu beenden oder zu löschen.

Anstatt Flink-Jobs mit der Dataproc-Ressource Jobs auszuführen, können Sie Flink-Jobs mit der flink-Befehlszeile auf dem Masterknoten Ihres Flink-Clusters ausführen.

In den folgenden Abschnitten werden verschiedene Möglichkeiten beschrieben, wie Sie einen flink-Befehlszeilenjob in Ihrem Dataproc-Flink-Cluster ausführen können.

  1. SSH-Verbindung zum Masterknoten herstellen:Verwenden Sie das SSH, um ein Terminalfenster auf der Clustermaster-VM zu öffnen.

  2. Classpath festlegen:Initialisieren Sie den Hadoop-Classpath über das SSH-Terminalfenster auf der Flink-Cluster-Master-VM:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Flink-Jobs ausführen:Sie können Flink-Jobs in verschiedenen Bereitstellungsmodi in YARN ausführen: Anwendungs-, Job- und Sitzungsmodus.

    1. Anwendungsmodus:Der Flink-Anwendungsmodus wird ab der Dataproc-Image-Version 2.0 unterstützt. In diesem Modus wird die main()-Methode des Jobs im YARN-Jobmanager ausgeführt. Der Cluster wird nach Abschluss des Jobs heruntergefahren.

      Beispiel für die Jobsendung:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Laufende Jobs auflisten:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      So brechen Sie einen laufenden Job ab:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Jobspezifischer Modus:In diesem Flink-Modus wird die main()-Methode des Jobs auf der Clientseite ausgeführt.

      Beispiel für die Jobsendung:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Sitzungsmodus:Starten Sie eine lang andauernde Flink-YARN-Sitzung und reichen Sie dann einen oder mehrere Jobs für die Sitzung ein.

      1. Sitzung starten:Sie haben folgende Möglichkeiten, eine Flink-Sitzung zu starten:

        1. Erstellen Sie einen Flink-Cluster, indem Sie dem Befehl gcloud dataproc clusters create das Flag --metadata flink-start-yarn-session=true hinzufügen. Weitere Informationen finden Sie unter Dataproc-Flink-Cluster erstellen. Wenn dieses Flag aktiviert ist, führt Dataproc nach dem Erstellen des Clusters /usr/bin/flink-yarn-daemon aus, um eine Flink-Sitzung auf dem Cluster zu starten.

          Die YARN-Anwendungs-ID der Sitzung wird in /tmp/.yarn-properties-${USER} gespeichert. Sie können die ID mit dem Befehl yarn application -list auflisten.

        2. Führen Sie das Flink-Skript yarn-session.sh aus, das auf der Cluster-Master-VM vorinstalliert ist, mit benutzerdefinierten Einstellungen aus:

          Beispiel mit benutzerdefinierten Einstellungen:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Führen Sie das Flink-/usr/bin/flink-yarn-daemon-Wrapper-Script mit den Standardeinstellungen aus:

          . /usr/bin/flink-yarn-daemon
          
      2. Job an eine Sitzung senden:Führen Sie den folgenden Befehl aus, um einen Flink-Job an die Sitzung zu senden.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: die URL, einschließlich Host und Port, der Flink-Master-VM, auf der Jobs ausgeführt werden. Entfernen Sie das http:// prefix aus der URL. Diese URL wird in der Befehlsausgabe aufgeführt, wenn Sie eine Flink-Sitzung starten. Mit dem folgenden Befehl können Sie diese URL im Feld Tracking-URL auflisten:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Jobs in einer Sitzung auflisten:So listen Sie Flink-Jobs in einer Sitzung auf:

        • Führen Sie flink list ohne Argumente aus. Der Befehl sucht in /tmp/.yarn-properties-${USER} nach der YARN-Anwendungs-ID der Sitzung.

        • Rufen Sie die YARN-Anwendungs-ID der Sitzung aus /tmp/.yarn-properties-${USER} oder der Ausgabe von yarn application -list ab und führen Sie dann <code>flink list -yid YARN_APPLICATION_ID aus.

        • Führen Sie flink list -m FLINK_MASTER_URL aus.

      4. Sitzung beenden:Um die Sitzung zu beenden, rufen Sie die YARN-Anwendungs-ID der Sitzung aus /tmp/.yarn-properties-${USER} oder der Ausgabe von yarn application -list ab und führen Sie dann einen der folgenden Befehle aus:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

Sie können Apache Beam-Jobs in Dataproc mit dem FlinkRunner ausführen.

Sie können Beam-Jobs auf Flink auf folgende Weise ausführen:

  1. Java Beam-Jobs
  2. Portierbare Beam-Jobs

Java Beam-Jobs

Verpacken Sie Ihre Beam-Jobs in eine JAR-Datei. Geben Sie die gebündelte JAR-Datei mit den Abhängigkeiten an, die zum Ausführen des Jobs erforderlich sind.

Im folgenden Beispiel wird ein Java Beam-Job vom Masterknoten des Dataproc-Clusters ausgeführt.

  1. Erstellen Sie einen Dataproc-Cluster mit aktivierter Flink-Komponente.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).
    • --region: eine unterstützte Dataproc-Region.
    • --enable-component-gateway: Aktiviert den Zugriff auf die Benutzeroberfläche des Flink Job Manager.
    • --scopes: Damit wird der Zugriff Ihres Clusters auf Google Cloud APIs aktiviert (siehe Best Practices für Bereiche). Der Bereich cloud-platform ist standardmäßig aktiviert (Sie müssen diese Flag-Einstellung nicht angeben), wenn Sie einen Cluster erstellen, für den die Dataproc-Image-Version 2.1 oder höher verwendet wird.
  2. Verwenden Sie das SSH-Dienstprogramm, um ein Terminalfenster auf dem Masterknoten des Flink-Clusters zu öffnen.

  3. Starten Sie eine Flink YARN-Sitzung auf dem Masterknoten des Dataproc-Clusters.

    . /usr/bin/flink-yarn-daemon
    

    Notieren Sie sich die Flink-Version auf Ihrem Dataproc-Cluster.

    flink --version
    
  4. Erstellen Sie auf Ihrem lokalen Computer das Beispiel für die kanonische Beam-Zählung in Java.

    Wählen Sie eine Beam-Version aus, die mit der Flink-Version auf Ihrem Dataproc-Cluster kompatibel ist. In der Tabelle Kompatibilität mit der Flink-Version sind die Versionen der Beam-Flink-Version aufgeführt.

    Öffnen Sie die generierte POM-Datei. Prüfen Sie die Version des Beam-Flink-Runners, die mit dem Tag <flink.artifact.name> angegeben wird. Wenn die Beam-Flink-Runner-Version im Flink-Artefaktnamen nicht mit der Flink-Version auf Ihrem Cluster übereinstimmt, aktualisieren Sie die Versionsnummer entsprechend.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Beispiel für das Verpacken der Wortzahl.

    mvn package -Pflink-runner
    
  6. Laden Sie die Uber-JAR-Paketdatei, word-count-beam-bundled-0.1.jar, (ca. 135 MB) auf den Masterknoten des Dataproc-Clusters hoch. Sie können gcloud storage cp für eine schnellere Dateiübertragung von Cloud Storage zu Ihrem Dataproc-Cluster verwenden.

    1. Erstellen Sie auf Ihrem lokalen Terminal einen Cloud Storage-Bucket und laden Sie die Uber-JAR-Datei hoch.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Laden Sie auf dem Masterknoten von Dataproc die Uber-JAR-Datei herunter.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Führen Sie den Java Beam-Job auf dem Masterknoten des Dataproc-Clusters aus.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Prüfen Sie, ob die Ergebnisse in Ihren Cloud Storage-Bucket geschrieben wurden.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Beenden Sie die Flink YARN-Sitzung.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Portierbare Beam-Jobs

Zum Ausführen von in Python, Go und anderen unterstützten Sprachen geschriebenen Beam-Jobs können Sie FlinkRunner und PortableRunner verwenden, wie im Flink-Runner des Beams beschrieben (siehe auch Portability Framework Roadmap).

Im folgenden Beispiel wird ein portabler Beam-Job in Python aus dem Masterknoten des Dataproc-Clusters ausgeführt.

  1. Erstellen Sie einen Dataproc-Cluster, bei dem sowohl die Flink- als auch die Docker-Komponenten aktiviert sind.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Hinweise:

    • --optional-components: Flink und Docker
    • --image-version: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (siehe die Apache Flink-Komponentenversionen für die letzten und vorherigen vier 2.0.x-Image-Releaseversionen).
    • --region: Eine verfügbare Dataproc-Region.
    • --enable-component-gateway: Aktivieren Sie den Zugriff auf die Benutzeroberfläche des Flink Job Managers.
    • --scopes: Aktivieren Sie den Zugriff Ihres Clusters auf Google Cloud APIs (siehe Best Practices für Bereiche). Der Bereich cloud-platform ist standardmäßig aktiviert (Sie müssen diese Flag-Einstellung nicht angeben), wenn Sie einen Cluster erstellen, für den die Dataproc-Image-Version 2.1 oder höher verwendet wird.
  2. Verwenden Sie die gcloud CLI lokal oder in Cloud Shell, um einen Cloud Storage-Bucket zu erstellen. Sie geben BUCKET_NAME an, wenn Sie ein Beispielprogramm für die Wortzählung ausführen.

    gcloud storage buckets create BUCKET_NAME
    
  3. Starten Sie in einem Terminalfenster auf der Cluster-VM eine Flink-YARN-Sitzung. Notieren Sie sich die Flink-Master-URL, die Adresse des Flink-Masters, auf dem Jobs ausgeführt werden. Sie geben FLINK_MASTER_URL an, wenn Sie ein Beispielprogramm für die Wortanzahl ausführen.

    . /usr/bin/flink-yarn-daemon
    

    Rufen Sie die Flink-Version auf, die auf dem Dataproc-Cluster ausgeführt wird, und notieren Sie sich diese. Sie geben FLINK_VERSION an, wenn Sie ein Beispielprogramm für die Wortanzahl ausführen.

    flink --version
    
  4. Installieren Sie die für den Job erforderlichen Python-Bibliotheken auf dem Clustermasterknoten.

  5. Installieren Sie eine Beam-Version, die mit der Flink-Version im Cluster kompatibel ist.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Führen Sie das Beispiel für die Wortzählung auf dem Masterknoten des Clusters aus.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Hinweise:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, wie oben angegeben.
    • --flink_master: FLINK_MASTER_URL, wie oben angegeben.
    • --flink_submit_uber_jar: Verwenden Sie die Uber-JAR-Datei, um den Beam-Job auszuführen.
    • --output: BUCKET_NAME, das Sie zuvor erstellt haben.
  7. Prüfen Sie, ob die Ergebnisse in Ihren Bucket geschrieben wurden.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Beenden Sie die Flink YARN-Sitzung.

    1. Rufen Sie die Anwendungs-ID ab.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Die Dataproc Flink-Komponente unterstützt mit Kerberos gesicherte Cluster. Ein gültiges Kerberos-Ticket ist erforderlich, um einen Flink-Job zu senden und beizubehalten oder einen Flink-Cluster zu starten. Standardmäßig bleibt ein Kerberos-Ticket sieben Tage lang gültig.

Die Flink Job Manager-Weboberfläche ist verfügbar, während ein Flink-Job oder ein Flink-Sitzungscluster ausgeführt wird. So verwenden Sie die Weboberfläche:

  1. Erstellen Sie einen Dataproc-Flink-Cluster.
  2. Klicken Sie nach der Clustererstellung in der Google Cloud Console auf der Seite Clusterdetails auf den Tab der Web-Benutzeroberfläche und auf das Component Gateway YARN ResourceManager-Link.
  3. Ermitteln Sie auf der Benutzeroberfläche von YARN Resource Manager den Anwendungseintrag des Flink-Clusters. Je nach Abschlussstatus des Jobs wird ein Link zu ApplicationMaster oder History angezeigt.
  4. Bei einem Streamingjob mit langer Ausführungszeit klicken Sie auf den Link ApplicationManager, um das Flink-Dashboard zu öffnen. Klicken Sie für einen abgeschlossenen Job auf den Link History (Verlauf), um Jobdetails aufzurufen.