Optionale Dataproc Flink-Komponente

Sie können zusätzliche Komponenten wie Flink aktivieren, wenn Sie einen Dataproc-Cluster mit dem Feature Optionale Komponenten erstellen. Auf dieser Seite wird beschrieben, wie Sie einen Dataproc-Cluster mit der optionalen Apache Flink-Komponente (ein Flink-Cluster) erstellen und dann Flink-Jobs im Cluster ausführen.

Sie können Ihren Flink-Cluster für Folgendes verwenden:

  1. Flink-Jobs mit der Dataproc-Ressource Jobs ausführen – über die Google Cloud -Konsole, die Google Cloud CLI oder die Dataproc API.

  2. Flink-Jobs mit der flink-Befehlszeile ausführen, die auf dem Masterknoten des Flink-Clusters ausgeführt wird.

  3. Apache Beam-Jobs in Flink ausführen

  4. Flink auf einem kerberisierten Cluster ausführen

Sie können die Google Cloud -Konsole, die Google Cloud CLI oder die Dataproc API verwenden, um einen Dataproc-Cluster zu erstellen, auf dem die Flink-Komponente aktiviert ist.

Empfehlung:Verwenden Sie einen Standardcluster mit einer Master-VM und der Flink-Komponente. Dataproc-Cluster im Modus für hohe Verfügbarkeit (mit 3 Master-VMs) unterstützen den Flink-Modus für hohe Verfügbarkeit nicht.

  • Verwenden Sie den Flink History Server-Link im Component Gateway, um den Flink-Verlaufsserver aufzurufen, der im Flink-Cluster ausgeführt wird.
  • Verwenden Sie YARN ResourceManager link im Component Gateway, um die Flink Job Manager-Weboberfläche auf dem Flink-Cluster aufzurufen .
  • Erstellen Sie einen Dataproc Persistent History Server, um Flink-Jobverlaufsdateien aufzurufen, die von vorhandenen und gelöschten Flink-Clustern geschrieben wurden.

Sie können Flink-Jobs mit der Dataproc-Ressource Jobs über dieGoogle Cloud Console, die Google Cloud CLI oder die Dataproc API ausführen.

Console

So reichen Sie einen Flink-Beispieljob für die Wortzählung über die Console ein:

  1. Öffnen Sie die Dataproc-Seite Job senden in derGoogle Cloud Console in Ihrem Browser.

  2. Füllen Sie die Felder auf der Seite Job senden aus:

    1. Wählen Sie den Namen des Clusters aus der Clusterliste aus.
    2. Legen Sie für Job type (Jobtyp) den Wert Flink fest.
    3. Legen Sie für Main class or jar (Hauptklasse oder JAR-Datei) den Wert org.apache.flink.examples.java.wordcount.WordCount fest.
    4. Legen Sie für Jar-Dateien den Wert file:///usr/lib/flink/examples/batch/WordCount.jar fest.
      • file:/// steht für eine Datei, die sich im Cluster befindet. WordCount.jar wurde von Dataproc installiert, als der Flink-Cluster erstellt wurde.
      • In diesem Feld kann auch ein Cloud Storage-Pfad (gs://BUCKET/JARFILE) oder ein Hadoop Distributed File System-Pfad (HDFS-Pfad) (hdfs://PATH_TO_JAR) angegeben werden.
  3. Klicken Sie auf Senden.

    • Die Job-Treiberausgabe wird auf der Seite Jobdetails angezeigt.
    • Flink-Jobs werden in der Google Cloud -Console auf der Dataproc-Seite Jobs aufgeführt.
    • Klicken Sie auf der Seite Jobs oder Jobdetails auf Beenden oder Löschen, um einen Job zu beenden oder zu löschen.

gcloud

Zum Senden eines Flink-Jobs an einen Dataproc-Flink-Cluster führen Sie den gcloud CLI-Befehl gcloud dataproc jobs submit lokal in einem Terminalfenster oder in Cloud Shell aus.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Hinweise:

  • CLUSTER_NAME: Geben Sie den Namen des Dataproc Flink-Clusters an, an den der Job gesendet werden soll.
  • REGION: Geben Sie eine Compute Engine-Region an, in der sich der Cluster befindet.
  • MAIN_CLASS: Geben Sie die main-Klasse Ihrer Flink-Anwendung an, z. B.:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: Geben Sie die JAR-Datei der Flink-Anwendung an. Sie können Folgendes angeben:
    • Eine JAR-Datei, die auf dem Cluster installiert ist, mit dem Präfix file:///` :
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Eine JAR-Datei in Cloud Storage: gs://BUCKET/JARFILE
    • Eine JAR-Datei in HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: Optional können Sie nach dem doppelten Bindestrich (--) Jobargumente hinzufügen.

  • Nachdem Sie den Job gesendet haben, wird die Jobtreiberausgabe im lokalen oder Cloud Shell-Terminal angezeigt.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

In diesem Abschnitt wird gezeigt, wie Sie einen Flink-Job an einen Dataproc-Flink-Cluster senden. Dazu verwenden Sie die Dataproc API jobs.submit.

Ersetzen Sie diese Werte in den folgenden Anfragedaten:

  • PROJECT_ID: Google Cloud Projekt-ID
  • REGION: Cluster-Region
  • CLUSTER_NAME: Geben Sie den Namen des Dataproc Flink-Clusters an, an den der Job gesendet werden soll.

HTTP-Methode und URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

JSON-Text anfordern:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Flink-Jobs werden in der Google Cloud -Console auf der Dataproc-Seite Jobs aufgeführt.
  • Sie können einen Job beenden oder löschen, indem Sie in der Google Cloud -Konsole auf der Seite Jobs oder Jobdetails auf Beenden oder Löschen klicken.

Anstatt Flink-Jobs mit der Dataproc-Ressource Jobs auszuführen, können Sie Flink-Jobs auf dem Masterknoten Ihres Flink-Clusters mit der flink-Befehlszeile ausführen.

In den folgenden Abschnitten werden verschiedene Möglichkeiten zum Ausführen eines flink-CLI-Jobs in Ihrem Dataproc Flink-Cluster beschrieben.

  1. SSH-Verbindung zum Masterknoten herstellen:Verwenden Sie das SSH-Dienstprogramm, um ein Terminalfenster auf der Master-VM des Clusters zu öffnen.

  2. Classpath festlegen:Initialisieren Sie den Hadoop-Classpath über das SSH-Terminalfenster auf der Master-VM des Flink-Clusters:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Flink-Jobs ausführen:Sie können Flink-Jobs in verschiedenen Bereitstellungsmodi in YARN ausführen: Anwendungs-, Job- und Sitzungsmodus.

    1. Anwendungsmodus:Der Flink-Anwendungsmodus wird ab der Dataproc-Image-Version 2.0 unterstützt. In diesem Modus wird die Methode main() des Jobs im YARN Job Manager ausgeführt. Der Cluster wird nach Abschluss des Jobs heruntergefahren.

      Beispiel für das Senden eines Jobs:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Laufende Jobs auflisten:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      So brechen Sie einen laufenden Job ab:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Modus „Pro Job“:In diesem Flink-Modus wird die Methode main() des Jobs auf der Clientseite ausgeführt.

      Beispiel für das Senden eines Jobs:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Sitzungsmodus:Starten Sie eine lang andauernde Flink-YARN-Sitzung und senden Sie dann einen oder mehrere Jobs an die Sitzung.

      1. Sitzung starten:Sie haben folgende Möglichkeiten, eine Flink-Sitzung zu starten:

        1. Erstellen Sie einen Flink-Cluster und fügen Sie dem Befehl gcloud dataproc clusters create das Flag --metadata flink-start-yarn-session=true hinzu (siehe Dataproc-Flink-Cluster erstellen). Wenn dieses Flag aktiviert ist, führt Dataproc nach dem Erstellen des Clusters /usr/bin/flink-yarn-daemon aus, um eine Flink-Sitzung im Cluster zu starten.

          Die YARN-Anwendungs-ID der Sitzung wird in /tmp/.yarn-properties-${USER} gespeichert. Sie können die ID mit dem Befehl yarn application -list auflisten.

        2. Führen Sie das Flink-Skript yarn-session.sh, das auf der Master-VM des Clusters vorinstalliert ist, mit benutzerdefinierten Einstellungen aus:

          Beispiel mit benutzerdefinierten Einstellungen:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Führen Sie das /usr/bin/flink-yarn-daemon-Wrapper-Skript für Flink mit den Standardeinstellungen aus:

          . /usr/bin/flink-yarn-daemon
          
      2. Job an eine Sitzung senden:Führen Sie den folgenden Befehl aus, um einen Flink-Job an die Sitzung zu senden.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: Die URL, einschließlich Host und Port, der Flink-Master-VM, auf der Jobs ausgeführt werden. Entfernen Sie http:// prefix aus der URL. Diese URL wird in der Befehlsausgabe aufgeführt, wenn Sie eine Flink-Sitzung starten. Mit dem folgenden Befehl können Sie diese URL im Feld Tracking-URL auflisten:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Jobs in einer Sitzung auflisten:Führen Sie einen der folgenden Schritte aus, um Flink-Jobs in einer Sitzung aufzulisten:

        • Führen Sie flink list ohne Argumente aus. Der Befehl sucht in /tmp/.yarn-properties-${USER} nach der YARN-Anwendungs-ID der Sitzung.

        • Rufen Sie die YARN-Anwendungs-ID der Sitzung aus /tmp/.yarn-properties-${USER} oder der Ausgabe von yarn application -list ab und führen Sie dann <code>flink list -yid YARN_APPLICATION_ID aus.

        • Führen Sie flink list -m FLINK_MASTER_URL aus.

      4. Sitzung beenden:Rufen Sie zum Beenden der Sitzung die YARN-Anwendungs-ID der Sitzung aus /tmp/.yarn-properties-${USER} oder der Ausgabe von yarn application -list ab und führen Sie dann einen der folgenden Befehle aus:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

Sie können Apache Beam-Jobs in Dataproc mit dem FlinkRunner ausführen.

Sie können Beam-Jobs auf Flink auf folgende Weise ausführen:

  1. Java Beam-Jobs
  2. Portierbare Beam-Jobs

Java Beam-Jobs

Verpacken Sie Ihre Beam-Jobs in eine JAR-Datei. Geben Sie die gebündelte JAR-Datei mit den Abhängigkeiten an, die zum Ausführen des Jobs erforderlich sind.

Im folgenden Beispiel wird ein Java Beam-Job vom Masterknoten des Dataproc-Clusters ausgeführt.

  1. Erstellen Sie einen Dataproc-Cluster mit aktivierter Flink-Komponente.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).
    • --region: eine unterstützte Dataproc-Region.
    • --enable-component-gateway: Aktiviert den Zugriff auf die Benutzeroberfläche des Flink Job Manager.
    • --scopes: Ermöglichen Sie den Zugriff auf Google Cloud APIs durch Ihren Cluster (siehe Best Practices für Bereiche). Der Bereich cloud-platform ist standardmäßig aktiviert. Sie müssen diese Flageinstellung nicht angeben, wenn Sie einen Cluster erstellen, der die Dataproc-Image-Version 2.1 oder höher verwendet.
  2. Verwenden Sie das SSH-Dienstprogramm, um ein Terminalfenster auf dem Master-Knoten des Flink-Clusters zu öffnen.

  3. Starten Sie eine Flink YARN-Sitzung auf dem Masterknoten des Dataproc-Clusters.

    . /usr/bin/flink-yarn-daemon
    

    Notieren Sie sich die Flink-Version auf Ihrem Dataproc-Cluster.

    flink --version
    
  4. Erstellen Sie auf Ihrem lokalen Computer das Beispiel für die kanonische Beam-Zählung in Java.

    Wählen Sie eine Beam-Version aus, die mit der Flink-Version auf Ihrem Dataproc-Cluster kompatibel ist. In der Tabelle Kompatibilität mit der Flink-Version sind die Versionen der Beam-Flink-Version aufgeführt.

    Öffnen Sie die generierte POM-Datei. Prüfen Sie die Version des Beam-Flink-Runners, die mit dem Tag <flink.artifact.name> angegeben wird. Wenn die Beam-Flink-Runner-Version im Flink-Artefaktnamen nicht mit der Flink-Version auf Ihrem Cluster übereinstimmt, aktualisieren Sie die Versionsnummer entsprechend.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Beispiel für das Verpacken der Wortzahl.

    mvn package -Pflink-runner
    
  6. Laden Sie die Uber-JAR-Paketdatei, word-count-beam-bundled-0.1.jar, (ca. 135 MB) auf den Masterknoten des Dataproc-Clusters hoch. Sie können gcloud storage cp für eine schnellere Dateiübertragung von Cloud Storage zu Ihrem Dataproc-Cluster verwenden.

    1. Erstellen Sie auf Ihrem lokalen Terminal einen Cloud Storage-Bucket und laden Sie die Uber-JAR-Datei hoch.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Laden Sie auf dem Masterknoten von Dataproc die Uber-JAR-Datei herunter.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Führen Sie den Java Beam-Job auf dem Masterknoten des Dataproc-Clusters aus.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Prüfen Sie, ob die Ergebnisse in Ihren Cloud Storage-Bucket geschrieben wurden.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Beenden Sie die Flink YARN-Sitzung.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Portierbare Beam-Jobs

Zum Ausführen von in Python, Go und anderen unterstützten Sprachen geschriebenen Beam-Jobs können Sie FlinkRunner und PortableRunner verwenden, wie im Flink-Runner des Beams beschrieben (siehe auch Portability Framework Roadmap).

Im folgenden Beispiel wird ein portabler Beam-Job in Python aus dem Masterknoten des Dataproc-Clusters ausgeführt.

  1. Erstellen Sie einen Dataproc-Cluster, bei dem sowohl die Flink- als auch die Docker-Komponenten aktiviert sind.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Hinweise:

    • --optional-components: Flink und Docker
    • --image-version: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).
    • --region: Eine verfügbare Dataproc-Region.
    • --enable-component-gateway: Aktiviert den Zugriff auf die Benutzeroberfläche des Flink Job Manager.
    • --scopes: Ermöglichen Sie den Zugriff auf Google Cloud APIs durch Ihren Cluster (siehe Best Practices für Bereiche). Der Bereich cloud-platform ist standardmäßig aktiviert. Sie müssen diese Flageinstellung nicht angeben, wenn Sie einen Cluster erstellen, der die Dataproc-Image-Version 2.1 oder höher verwendet.
  2. Verwenden Sie die gcloud CLI lokal oder in Cloud Shell, um einen Cloud Storage-Bucket zu erstellen. Sie geben BUCKET_NAME an, wenn Sie ein Beispielprogramm für die Wortzählung ausführen.

    gcloud storage buckets create BUCKET_NAME
    
  3. Starten Sie in einem Terminalfenster auf der Cluster-VM eine Flink YARN-Sitzung. Notieren Sie sich die Flink-Master-URL, die Adresse des Flink-Masters, auf dem Jobs ausgeführt werden. Sie geben FLINK_MASTER_URL an, wenn Sie ein Beispielprogramm für die Wortzählung ausführen.

    . /usr/bin/flink-yarn-daemon
    

    Lassen Sie sich die Flink-Version anzeigen, die auf dem Dataproc-Cluster ausgeführt wird, und notieren Sie sie sich. Sie geben FLINK_VERSION an, wenn Sie ein Beispielprogramm für die Wortzählung ausführen.

    flink --version
    
  4. Installieren Sie die für den Job erforderlichen Python-Bibliotheken auf dem Masterknoten des Clusters.

  5. Installieren Sie eine Beam-Version, die mit der Flink-Version auf dem Cluster kompatibel ist.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Führen Sie das Beispiel für die Wortzählung auf dem Masterknoten des Clusters aus.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Hinweise:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, wie oben beschrieben.
    • --flink_master: FLINK_MASTER_URL, wie oben beschrieben.
    • --flink_submit_uber_jar: Verwenden Sie die Uber-JAR-Datei, um den Beam-Job auszuführen.
    • --output: BUCKET_NAME, die zuvor erstellt wurde.
  7. Prüfen Sie, ob die Ergebnisse in Ihren Bucket geschrieben wurden.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Beenden Sie die Flink YARN-Sitzung.

    1. Rufen Sie die Anwendungs-ID ab.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Die Dataproc Flink-Komponente unterstützt mit Kerberos gesicherte Cluster. Ein Kerberos-Ticket ist erforderlich, um einen Flink-Job zu senden und beizubehalten oder einen Flink-Cluster zu starten. Standardmäßig bleibt ein Kerberos-Ticket sieben Tage lang gültig.

Die Flink Job Manager-Weboberfläche ist verfügbar, während ein Flink-Job oder ein Flink-Sitzungscluster ausgeführt wird. So verwenden Sie die Weboberfläche:

  1. Dataproc-Flink-Cluster erstellen.
  2. Nach der Clustererstellung klicken Sie in der Google Cloud Console auf der Seite Clusterdetails auf den Tab der Web-Benutzeroberfläche und auf das Component Gateway YARN ResourceManager-Link.
  3. Ermitteln Sie auf der Benutzeroberfläche von YARN Resource Manager den Anwendungseintrag des Flink-Clusters. Je nach Abschlussstatus des Jobs wird ein Link zu ApplicationMaster oder History angezeigt.
  4. Bei einem Streamingjob mit langer Ausführungszeit klicken Sie auf den Link ApplicationManager, um das Flink-Dashboard zu öffnen. Klicken Sie für einen abgeschlossenen Job auf den Link History (Verlauf), um Jobdetails aufzurufen.