Sie können zusätzliche Komponenten wie Flink aktivieren, wenn Sie einen Dataproc-Cluster mit der Funktion Optionale Komponenten erstellen. Auf dieser Seite erfahren Sie, wie Sie einen Dataproc-Cluster mit der optionalen Komponente Apache Flink erstellen (ein Flink-Cluster) und dann Flink-Jobs auf dem Cluster ausführen.
Sie können Ihren Flink-Cluster für Folgendes verwenden:
Flink-Jobs mit der Dataproc-
Jobs
-Ressource über die Google Cloud Console, die Google Cloud CLI oder die Dataproc API ausführenFlink-Jobs mit der
flink
CLI ausführen, die auf dem Masterknoten des Flink-Clusters ausgeführt wird.
Dataproc-Flink-Cluster erstellen
Sie können die Google Cloud Console, die Google Cloud CLI oder die Dataproc API verwenden, um einen Dataproc-Cluster zu erstellen, in dem die Flink-Komponente aktiviert ist.
Empfehlung:Verwenden Sie einen Standard-VM-Cluster mit einem Master und der Flink-Komponente. Dataproc-Cluster im Modus für hohe Verfügbarkeit (mit 3 Master-VMs) unterstützen den Flink-Modus für hohe Verfügbarkeit nicht.
Console
So erstellen Sie mit der Google Cloud Console einen Dataproc-Flink-Cluster:
Öffnen Sie die Dataproc-Seite Dataproc-Cluster in Compute Engine erstellen.
- Der Bereich Cluster einrichten ist ausgewählt.
- Bestätigen oder ändern Sie im Abschnitt Versionsverwaltung den Image-Typ und die Image-Version. Die Cluster-Image-Version bestimmt die Version der im Cluster installierten Flink-Komponente.
- Die Image-Version muss mindestens 1.5 sein, um die Flink-Komponente im Cluster zu aktivieren. Eine Liste der Komponentenversionen, die in den einzelnen Dataproc-Image-Releases enthalten sind, finden Sie unter Unterstützte Dataproc-Versionen.
- Die Image-Version muss [TBD] oder höher sein, um Flink-Jobs über die Dataproc Jobs API auszuführen (siehe Dataproc Flink-Jobs ausführen).
- Im Bereich Komponenten:
- Wählen Sie unter Component Gateway die Option Component Gateway aktivieren aus. Sie müssen das Component Gateway aktivieren, um die Verknüpfung des Component Gateways mit der Flink-Verlaufsserver-Benutzeroberfläche zu aktivieren. Wenn Sie das Komponentengateway aktivieren, können Sie auch auf die Weboberfläche von Flink Job Manager zugreifen, die auf dem Flink-Cluster ausgeführt wird.
- Wählen Sie unter Optionale Komponenten Flink und andere optionale Komponenten aus, die auf Ihrem Cluster aktiviert werden sollen.
- Bestätigen oder ändern Sie im Abschnitt Versionsverwaltung den Image-Typ und die Image-Version. Die Cluster-Image-Version bestimmt die Version der im Cluster installierten Flink-Komponente.
Klicken Sie auf den Bereich Cluster anpassen (optional).
Klicken Sie im Bereich Cluster-Attribute für jede optionale Cluster-Eigenschaft, die Sie dem Cluster hinzufügen möchten, auf Attribute hinzufügen. Sie können
flink
-Properties mit Präfix hinzufügen, um Flink-Properties in/etc/flink/conf/flink-conf.yaml
zu konfigurieren, die als Standard für Flink-Anwendungen dienen, die Sie im Cluster ausführen.Beispiele:
- Mit
flink:historyserver.archive.fs.dir
können Sie den Cloud Storage-Speicherort angeben, an dem Flink-Jobverlaufsdateien geschrieben werden sollen. Dieser Speicherort wird vom Flink History Server verwendet, der im Flink-Cluster ausgeführt wird. - Legen Sie Flink-Aufgabenslots mit
flink:taskmanager.numberOfTaskSlots=n
fest.
- Mit
Klicken Sie im Abschnitt Benutzerdefinierte Clustermetadaten auf Metadaten hinzufügen, um optionale Metadaten hinzuzufügen. Fügen Sie beispielsweise
flink-start-yarn-session
true
hinzu, um den Flink YARN-Daemon (/usr/bin/flink-yarn-daemon
) im Hintergrund auf dem Clustermasterknoten auszuführen und eine Flink YARN-Sitzung zu starten (siehe Flink-Sitzungsmodus).
Wenn Sie die Dataproc-Image-Version 2.0 oder niedriger verwenden, klicken Sie auf den Bereich Sicherheit verwalten (optional) und wählen Sie dann unter Projektzugriff die Option
Enables the cloud-platform scope for this cluster
aus. Der Bereichcloud-platform
ist standardmäßig aktiviert, wenn Sie einen Cluster erstellen, der die Dataproc-Image-Version 2.1 oder höher verwendet.
- Der Bereich Cluster einrichten ist ausgewählt.
Klicken Sie auf Erstellen, um den Cluster zu erstellen.
gcloud
Wenn Sie mit der gcloud CLI einen Dataproc-Flink-Cluster erstellen möchten, führen Sie den folgenden Befehl gcloud dataproc clusters create lokal in einem Terminalfenster oder in Cloud Shell aus:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
Hinweise:
- CLUSTER_NAME: Geben Sie den Namen des Clusters an.
- REGION: Geben Sie eine Compute Engine-Region an, in der sich der Cluster befinden soll.
DATAPROC_IMAGE_VERSION: Optional können Sie die Imageversion angeben, die im Cluster verwendet werden soll. Die Cluster-Image-Version bestimmt die Version der im Cluster installierten Flink-Komponente.
Die Image-Version muss mindestens 1.5 sein, um die Flink-Komponente im Cluster zu aktivieren. Eine Liste der Komponentenversionen, die in den einzelnen Dataproc-Image-Releases enthalten sind, finden Sie unter Unterstützte Dataproc-Versionen.
Die Image-Version muss [TBD] oder höher sein, um Flink-Jobs über die Dataproc Jobs API auszuführen (siehe Dataproc Flink-Jobs ausführen).
--optional-components
: Sie müssen dieFLINK
-Komponente angeben, um Flink-Jobs und den Flink HistoryServer-Webdienst auf dem Cluster auszuführen.--enable-component-gateway
: Sie müssen das Component Gateway aktivieren, um den Link zum Flink-Verlaufsserver über das Component Gateway zu aktivieren. Wenn Sie das Komponentengateway aktivieren, können Sie auch auf die Flink Job Manager-Weboberfläche zugreifen, die auf dem Flink-Cluster ausgeführt wird.PROPERTIES: Optional können Sie eine oder mehrere Clustereigenschaften angeben.
Wenn Sie Dataproc-Cluster mit den Image-Versionen
2.0.67
und höher sowie2.1.15
und höher erstellen, können Sie mit dem Flag--properties
Flink-Eigenschaften in/etc/flink/conf/flink-conf.yaml
konfigurieren, die als Standard für Flink-Anwendungen dienen, die Sie im Cluster ausführen.Sie können
flink:historyserver.archive.fs.dir
festlegen, um den Cloud Storage-Speicherort für das Schreiben von Flink-Jobverlaufsdateien anzugeben. Dieser Speicherort wird vom Flink-Verlaufsserver verwendet, der auf dem Flink-Cluster ausgeführt wird.Beispiel für mehrere Unterkünfte:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
Weitere Flags:
- Sie können das optionale Flag
--metadata flink-start-yarn-session=true
hinzufügen, um den Flink YARN-Daemon (/usr/bin/flink-yarn-daemon
) im Hintergrund auf dem Clustermasterknoten auszuführen und eine Flink YARN-Sitzung zu starten (siehe Flink-Sitzungsmodus).
- Sie können das optionale Flag
Wenn Sie die Imageversion 2.0 oder eine frühere Version verwenden, können Sie das Flag
--scopes=https://www.googleapis.com/auth/cloud-platform
hinzufügen, um Ihrem Cluster Zugriff auf Google Cloud APIs zu gewähren (siehe Best Practices für Bereiche). Der Bereichcloud-platform
ist standardmäßig aktiviert, wenn Sie einen Cluster erstellen, der die Dataproc-Image-Version 2.1 oder höher verwendet.
API
Wenn Sie einen Dataproc-Flink-Cluster mit der Dataproc API erstellen möchten, senden Sie eine clusters.create-Anfrage. Gehen Sie dazu so vor:
Hinweise:
Legen Sie für SoftwareConfig.Component den Wert
FLINK
fest.Optional können Sie
SoftwareConfig.imageVersion
angeben, um die Image-Version für den Cluster festzulegen. Die Cluster-Image-Version bestimmt die Version der im Cluster installierten Flink-Komponente.Die Image-Version muss mindestens 1.5 sein, um die Flink-Komponente im Cluster zu aktivieren. Eine Liste der Komponentenversionen, die in den einzelnen Dataproc-Image-Releases enthalten sind, finden Sie unter Unterstützte Dataproc-Versionen.
Die Image-Version muss [TBD] oder höher sein, um Flink-Jobs über die Dataproc Jobs API auszuführen (siehe Dataproc Flink-Jobs ausführen).
Legen Sie EndpointConfig.enableHttpPortAccess auf
true
fest, um den Link zum Component Gateway zur Flink-Verlaufsserver-Benutzeroberfläche zu aktivieren. Wenn Sie das Komponentengateway aktivieren, können Sie auch auf die Flink Job Manager-Weboberfläche zugreifen, die auf dem Flink-Cluster ausgeführt wird.Optional können Sie
SoftwareConfig.properties
festlegen, um eine oder mehrere Clustereigenschaften anzugeben.- Sie können Flink-Eigenschaften angeben, die als Standard für Flink-Anwendungen dienen, die Sie im Cluster ausführen. Sie können beispielsweise das
flink:historyserver.archive.fs.dir
-Flag verwenden, um den Cloud Storage-Speicherort anzugeben, an dem Flink-Jobverlaufsdateien geschrieben werden sollen. Dieser Speicherort wird vom Flink-Verlaufsserver verwendet, der auf dem Flink-Cluster ausgeführt wird.
- Sie können Flink-Eigenschaften angeben, die als Standard für Flink-Anwendungen dienen, die Sie im Cluster ausführen. Sie können beispielsweise das
Optional können Sie Folgendes festlegen:
GceClusterConfig.metadata
. Sie können beispielsweiseflink-start-yarn-session
true
angeben, um den Flink YARN-Daemon (/usr/bin/flink-yarn-daemon
) im Hintergrund auf dem Clustermasterknoten auszuführen und eine Flink YARN-Sitzung zu starten (siehe Flink-Sitzungsmodus).- GceClusterConfig.serviceAccountScopes auf
https://www.googleapis.com/auth/cloud-platform
(cloud-platform
-Bereich), wenn Sie Image-Versionen 2.0 oder niedriger verwenden, um den Zugriff Ihres Clusters auf Google Cloud APIs zu ermöglichen (siehe Best Practices für Bereiche). Der Bereichcloud-platform
ist standardmäßig aktiviert, wenn Sie einen Cluster erstellen, der die Dataproc-Image-Version 2.1 oder höher verwendet.
Nach dem Erstellen eines Flink-Clusters
- Über den Link
Flink History Server
im Component Gateway können Sie den Flink-Verlaufsserver aufrufen, der auf dem Flink-Cluster ausgeführt wird. - Verwenden Sie das
YARN ResourceManager link
im Component Gateway, um die Flink Job Manager-Weboberfläche aufzurufen, die auf dem Flink-Cluster ausgeführt wird . - Erstellen Sie einen Dataproc Persistent History Server, um Flink-Jobverlaufsdateien aufzurufen, die von vorhandenen und gelöschten Flink-Clustern geschrieben wurden.
Flink-Jobs mit der Dataproc-Ressource Jobs
ausführen
Sie können Flink-Jobs mit der Dataproc-Ressource Jobs
über die Google Cloud Console, die Google Cloud CLI oder die Dataproc API ausführen.
Console
So reichen Sie einen Beispiel-Flink-Wordcount-Job über die Console ein:
Öffnen Sie die Dataproc-Seite Job senden in der Google Cloud Console in Ihrem Browser.
Füllen Sie die Felder auf der Seite Job senden aus:
- Wählen Sie den Namen des Clusters aus der Clusterliste aus.
- Legen Sie für Job type (Jobtyp) den Wert
Flink
fest. - Legen Sie für Main class or jar (Hauptklasse oder JAR-Datei) den Wert
org.apache.flink.examples.java.wordcount.WordCount
fest. - Legen Sie für Jar-Dateien den Wert
file:///usr/lib/flink/examples/batch/WordCount.jar
fest.file:///
steht für eine Datei im Cluster.WordCount.jar
wurde von Dataproc beim Erstellen des Flink-Clusters installiert.- In diesem Feld ist auch ein Cloud Storage-Pfad (
gs://BUCKET/JARFILE
) oder ein Hadoop Distributed File System-Pfad (HDFS,hdfs://PATH_TO_JAR
) zulässig.
Klicken Sie auf Senden.
- Die Job-Treiberausgabe wird auf der Seite Jobdetails angezeigt.
- Flink-Jobs werden in der Google Cloud Console auf der Seite Dataproc-Jobs aufgeführt.
- Klicken Sie auf der Seite Jobs oder Jobdetails auf Beenden oder Löschen, um einen Job zu beenden oder zu löschen.
gcloud
Wenn Sie einen Flink-Job an einen Dataproc-Flink-Cluster senden möchten, führen Sie den Befehl gcloud dataproc jobs submit lokal in einem Terminalfenster oder in Cloud Shell aus.
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
Hinweise:
- CLUSTER_NAME: Geben Sie den Namen des Dataproc-Flink-Clusters an, an den der Job gesendet werden soll.
- REGION: Geben Sie eine Compute Engine-Region an, in der sich der Cluster befindet.
- MAIN_CLASS: Geben Sie die
main
-Klasse Ihrer Flink-Anwendung an, z. B.:org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: Geben Sie die JAR-Datei der Flink-Anwendung an. Sie können Folgendes angeben:
- Eine JAR-Datei, die im Cluster mit dem Präfix
file:///` installiert ist:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Eine JAR-Datei in Cloud Storage:
gs://BUCKET/JARFILE
- Eine JAR-Datei in HDFS:
hdfs://PATH_TO_JAR
- Eine JAR-Datei, die im Cluster mit dem Präfix
JOB_ARGS: Optional können Sie nach dem doppelten Bindestrich (
--
) Jobargumente hinzufügen.Nachdem Sie den Job gesendet haben, wird die Jobtreiberausgabe im lokalen oder Cloud Shell-Terminal angezeigt.
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
In diesem Abschnitt wird gezeigt, wie Sie einen Flink-Job mithilfe der Dataproc API jobs.submit an einen Dataproc-Flink-Cluster senden.
Ersetzen Sie diese Werte in den folgenden Anfragedaten:
- PROJECT_ID: ID des Google Cloud-Projekts.
- REGION: Cluster-Region
- CLUSTER_NAME: Geben Sie den Namen des Dataproc Flink-Clusters an, an den der Job gesendet werden soll.
HTTP-Methode und URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
JSON-Text anfordern:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:
Sie sollten in etwa folgende JSON-Antwort erhalten:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Flink-Jobs werden in der Google Cloud Console auf der Seite Dataproc-Jobs aufgeführt.
- Sie können in der Google Cloud Console auf der Seite Jobs oder Jobdetails auf Beenden oder Löschen klicken, um einen Job zu beenden oder zu löschen.
Flink-Jobs mit der flink
-Befehlszeile ausführen
Anstatt Flink-Jobs mit der Dataproc-Ressource Jobs
auszuführen, können Sie Flink-Jobs mit der flink
-Befehlszeile auf dem Masterknoten Ihres Flink-Clusters ausführen.
In den folgenden Abschnitten werden verschiedene Möglichkeiten beschrieben, wie Sie einen flink
-Befehlszeilenjob in Ihrem Dataproc-Flink-Cluster ausführen können.
SSH-Verbindung zum Masterknoten herstellen:Verwenden Sie das SSH, um ein Terminalfenster auf der Clustermaster-VM zu öffnen.
Classpath festlegen:Initialisieren Sie den Hadoop-Classpath über das SSH-Terminalfenster auf der Flink-Cluster-Master-VM:
export HADOOP_CLASSPATH=$(hadoop classpath)
Flink-Jobs ausführen:Sie können Flink-Jobs in verschiedenen Bereitstellungsmodi in YARN ausführen: Anwendungs-, Job- und Sitzungsmodus.
Anwendungsmodus:Der Flink-Anwendungsmodus wird ab der Dataproc-Image-Version 2.0 unterstützt. In diesem Modus wird die
main()
-Methode des Jobs im YARN-Jobmanager ausgeführt. Der Cluster wird nach Abschluss des Jobs heruntergefahren.Beispiel für die Jobsendung:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
Laufende Jobs auflisten:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
So brechen Sie einen laufenden Job ab:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Jobspezifischer Modus:In diesem Flink-Modus wird die
main()
-Methode des Jobs auf der Clientseite ausgeführt.Beispiel für die Jobsendung:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
Sitzungsmodus:Starten Sie eine lang andauernde Flink-YARN-Sitzung und reichen Sie dann einen oder mehrere Jobs für die Sitzung ein.
Sitzung starten:Sie haben folgende Möglichkeiten, eine Flink-Sitzung zu starten:
Erstellen Sie einen Flink-Cluster, indem Sie dem Befehl
gcloud dataproc clusters create
das Flag--metadata flink-start-yarn-session=true
hinzufügen. Weitere Informationen finden Sie unter Dataproc-Flink-Cluster erstellen. Wenn dieses Flag aktiviert ist, führt Dataproc nach dem Erstellen des Clusters/usr/bin/flink-yarn-daemon
aus, um eine Flink-Sitzung auf dem Cluster zu starten.Die YARN-Anwendungs-ID der Sitzung wird in
/tmp/.yarn-properties-${USER}
gespeichert. Sie können die ID mit dem Befehlyarn application -list
auflisten.Führen Sie das Flink-Skript
yarn-session.sh
aus, das auf der Cluster-Master-VM vorinstalliert ist, mit benutzerdefinierten Einstellungen aus:Beispiel mit benutzerdefinierten Einstellungen:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
Führen Sie das Flink-
/usr/bin/flink-yarn-daemon
-Wrapper-Script mit den Standardeinstellungen aus:. /usr/bin/flink-yarn-daemon
Job an eine Sitzung senden:Führen Sie den folgenden Befehl aus, um einen Flink-Job an die Sitzung zu senden.
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: die URL, einschließlich Host und Port, der Flink-Master-VM, auf der Jobs ausgeführt werden.
Entfernen Sie das
http:// prefix
aus der URL. Diese URL wird in der Befehlsausgabe aufgeführt, wenn Sie eine Flink-Sitzung starten. Mit dem folgenden Befehl können Sie diese URL im FeldTracking-URL
auflisten:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: die URL, einschließlich Host und Port, der Flink-Master-VM, auf der Jobs ausgeführt werden.
Entfernen Sie das
Jobs in einer Sitzung auflisten:So listen Sie Flink-Jobs in einer Sitzung auf:
Führen Sie
flink list
ohne Argumente aus. Der Befehl sucht in/tmp/.yarn-properties-${USER}
nach der YARN-Anwendungs-ID der Sitzung.Rufen Sie die YARN-Anwendungs-ID der Sitzung aus
/tmp/.yarn-properties-${USER}
oder der Ausgabe vonyarn application -list
ab und führen Sie dann<code>
flink list -yid YARN_APPLICATION_ID aus.Führen Sie
flink list -m FLINK_MASTER_URL
aus.
Sitzung beenden:Um die Sitzung zu beenden, rufen Sie die YARN-Anwendungs-ID der Sitzung aus
/tmp/.yarn-properties-${USER}
oder der Ausgabe vonyarn application -list
ab und führen Sie dann einen der folgenden Befehle aus:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Apache Beam-Jobs in Flink ausführen
Sie können Apache Beam-Jobs in Dataproc mit dem FlinkRunner
ausführen.
Sie können Beam-Jobs auf Flink auf folgende Weise ausführen:
- Java Beam-Jobs
- Portierbare Beam-Jobs
Java Beam-Jobs
Verpacken Sie Ihre Beam-Jobs in eine JAR-Datei. Geben Sie die gebündelte JAR-Datei mit den Abhängigkeiten an, die zum Ausführen des Jobs erforderlich sind.
Im folgenden Beispiel wird ein Java Beam-Job vom Masterknoten des Dataproc-Clusters ausgeführt.
Erstellen Sie einen Dataproc-Cluster mit aktivierter Flink-Komponente.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).--region
: eine unterstützte Dataproc-Region.--enable-component-gateway
: Aktiviert den Zugriff auf die Benutzeroberfläche des Flink Job Manager.--scopes
: Damit wird der Zugriff Ihres Clusters auf Google Cloud APIs aktiviert (siehe Best Practices für Bereiche). Der Bereichcloud-platform
ist standardmäßig aktiviert (Sie müssen diese Flag-Einstellung nicht angeben), wenn Sie einen Cluster erstellen, für den die Dataproc-Image-Version 2.1 oder höher verwendet wird.
Verwenden Sie das SSH-Dienstprogramm, um ein Terminalfenster auf dem Masterknoten des Flink-Clusters zu öffnen.
Starten Sie eine Flink YARN-Sitzung auf dem Masterknoten des Dataproc-Clusters.
. /usr/bin/flink-yarn-daemon
Notieren Sie sich die Flink-Version auf Ihrem Dataproc-Cluster.
flink --version
Erstellen Sie auf Ihrem lokalen Computer das Beispiel für die kanonische Beam-Zählung in Java.
Wählen Sie eine Beam-Version aus, die mit der Flink-Version auf Ihrem Dataproc-Cluster kompatibel ist. In der Tabelle Kompatibilität mit der Flink-Version sind die Versionen der Beam-Flink-Version aufgeführt.
Öffnen Sie die generierte POM-Datei. Prüfen Sie die Version des Beam-Flink-Runners, die mit dem Tag
<flink.artifact.name>
angegeben wird. Wenn die Beam-Flink-Runner-Version im Flink-Artefaktnamen nicht mit der Flink-Version auf Ihrem Cluster übereinstimmt, aktualisieren Sie die Versionsnummer entsprechend.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Beispiel für das Verpacken der Wortzahl.
mvn package -Pflink-runner
Laden Sie die Uber-JAR-Paketdatei,
word-count-beam-bundled-0.1.jar
, (ca. 135 MB) auf den Masterknoten des Dataproc-Clusters hoch. Sie könnengcloud storage cp
für eine schnellere Dateiübertragung von Cloud Storage zu Ihrem Dataproc-Cluster verwenden.Erstellen Sie auf Ihrem lokalen Terminal einen Cloud Storage-Bucket und laden Sie die Uber-JAR-Datei hoch.
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Laden Sie auf dem Masterknoten von Dataproc die Uber-JAR-Datei herunter.
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Führen Sie den Java Beam-Job auf dem Masterknoten des Dataproc-Clusters aus.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Prüfen Sie, ob die Ergebnisse in Ihren Cloud Storage-Bucket geschrieben wurden.
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Beenden Sie die Flink YARN-Sitzung.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Portierbare Beam-Jobs
Zum Ausführen von in Python, Go und anderen unterstützten Sprachen geschriebenen Beam-Jobs können Sie FlinkRunner
und PortableRunner
verwenden, wie im Flink-Runner des Beams beschrieben (siehe auch Portability Framework Roadmap).
Im folgenden Beispiel wird ein portabler Beam-Job in Python aus dem Masterknoten des Dataproc-Clusters ausgeführt.
Erstellen Sie einen Dataproc-Cluster, bei dem sowohl die Flink- als auch die Docker-Komponenten aktiviert sind.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Hinweise:
--optional-components
: Flink und Docker--image-version
: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (siehe die Apache Flink-Komponentenversionen für die letzten und vorherigen vier 2.0.x-Image-Releaseversionen).--region
: Eine verfügbare Dataproc-Region.--enable-component-gateway
: Aktivieren Sie den Zugriff auf die Benutzeroberfläche des Flink Job Managers.--scopes
: Aktivieren Sie den Zugriff Ihres Clusters auf Google Cloud APIs (siehe Best Practices für Bereiche). Der Bereichcloud-platform
ist standardmäßig aktiviert (Sie müssen diese Flag-Einstellung nicht angeben), wenn Sie einen Cluster erstellen, für den die Dataproc-Image-Version 2.1 oder höher verwendet wird.
Verwenden Sie die gcloud CLI lokal oder in Cloud Shell, um einen Cloud Storage-Bucket zu erstellen. Sie geben BUCKET_NAME an, wenn Sie ein Beispielprogramm für die Wortzählung ausführen.
gcloud storage buckets create BUCKET_NAME
Starten Sie in einem Terminalfenster auf der Cluster-VM eine Flink-YARN-Sitzung. Notieren Sie sich die Flink-Master-URL, die Adresse des Flink-Masters, auf dem Jobs ausgeführt werden. Sie geben FLINK_MASTER_URL an, wenn Sie ein Beispielprogramm für die Wortanzahl ausführen.
. /usr/bin/flink-yarn-daemon
Rufen Sie die Flink-Version auf, die auf dem Dataproc-Cluster ausgeführt wird, und notieren Sie sich diese. Sie geben FLINK_VERSION an, wenn Sie ein Beispielprogramm für die Wortanzahl ausführen.
flink --version
Installieren Sie die für den Job erforderlichen Python-Bibliotheken auf dem Clustermasterknoten.
Installieren Sie eine Beam-Version, die mit der Flink-Version im Cluster kompatibel ist.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Führen Sie das Beispiel für die Wortzählung auf dem Masterknoten des Clusters aus.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Hinweise:
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, wie oben angegeben.--flink_master
: FLINK_MASTER_URL, wie oben angegeben.--flink_submit_uber_jar
: Verwenden Sie die Uber-JAR-Datei, um den Beam-Job auszuführen.--output
: BUCKET_NAME, das Sie zuvor erstellt haben.
Prüfen Sie, ob die Ergebnisse in Ihren Bucket geschrieben wurden.
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Beenden Sie die Flink YARN-Sitzung.
- Rufen Sie die Anwendungs-ID ab.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Flink auf einem kerberisierten Cluster ausführen
Die Dataproc Flink-Komponente unterstützt mit Kerberos gesicherte Cluster. Ein gültiges Kerberos-Ticket ist erforderlich, um einen Flink-Job zu senden und beizubehalten oder einen Flink-Cluster zu starten. Standardmäßig bleibt ein Kerberos-Ticket sieben Tage lang gültig.
Auf die Benutzeroberfläche von Flink Job Manager zugreifen
Die Flink Job Manager-Weboberfläche ist verfügbar, während ein Flink-Job oder ein Flink-Sitzungscluster ausgeführt wird. So verwenden Sie die Weboberfläche:
- Erstellen Sie einen Dataproc-Flink-Cluster.
- Klicken Sie nach der Clustererstellung in der Google Cloud Console auf der Seite Clusterdetails auf den Tab der Web-Benutzeroberfläche und auf das Component Gateway YARN ResourceManager-Link.
- Ermitteln Sie auf der Benutzeroberfläche von YARN Resource Manager den Anwendungseintrag des Flink-Clusters. Je nach Abschlussstatus des Jobs wird ein Link zu ApplicationMaster oder History angezeigt.
- Bei einem Streamingjob mit langer Ausführungszeit klicken Sie auf den Link ApplicationManager, um das Flink-Dashboard zu öffnen. Klicken Sie für einen abgeschlossenen Job auf den Link History (Verlauf), um Jobdetails aufzurufen.