Anda dapat mengaktifkan komponen tambahan seperti Flink saat membuat cluster Dataproc menggunakan fitur Optional components. Halaman ini menunjukkan cara membuat cluster Dataproc dengan komponen opsional Apache Flink yang diaktifkan (cluster Flink), lalu menjalankan tugas Flink di cluster.
Anda dapat menggunakan cluster Flink untuk:
Jalankan tugas Flink menggunakan resource
Jobs
Dataproc dari Konsol Google Cloud, Google Cloud CLI, atau Dataproc API.Jalankan tugas Flink menggunakan CLI
flink
yang berjalan di node master cluster Flink.Jalankan Flink di cluster Kerberized
Membuat cluster Dataproc Flink
Anda dapat menggunakan konsol Google Cloud, Google Cloud CLI, atau Dataproc API untuk membuat cluster Dataproc dengan komponen Flink yang diaktifkan di cluster.
Rekomendasi: Gunakan cluster VM standar 1 master dengan komponen Flink. Cluster mode Ketersediaan Tinggi Dataproc (dengan 3 VM master) tidak mendukung mode ketersediaan tinggi Flink.
Konsol
Untuk membuat cluster Dataproc Flink menggunakan Konsol Google Cloud, lakukan langkah-langkah berikut:
Buka halaman Dataproc Buat cluster Dataproc di Compute Engine.
- Panel Siapkan cluster dipilih.
- Di bagian Pembuatan Versi, konfirmasi atau ubah
Jenis dan Versi Image. Versi image cluster menentukan versi komponen Flink yang diinstal di cluster.
- Versi image harus 1.5 atau lebih tinggi untuk mengaktifkan komponen Flink di cluster (Lihat Versi Dataproc yang didukung untuk melihat daftar versi komponen yang disertakan dalam setiap rilis image Dataproc).
- Versi gambar harus [TBD] atau lebih tinggi untuk menjalankan tugas Flink melalui Dataproc Jobs API (lihat Menjalankan tugas Dataproc Flink).
- Di bagian Komponen:
- Di bagian Gateway Komponen, pilih Aktifkan gateway komponen. Anda harus mengaktifkan Gateway Komponen untuk mengaktifkan link Gateway Komponen ke UI Server Histori Flink. Mengaktifkan Gateway Komponen juga akan mengaktifkan akses ke antarmuka web Flink Job Manager yang berjalan di cluster Flink.
- Di bagian Optional components, pilih Flink dan komponen opsional lainnya untuk diaktifkan di cluster Anda.
- Di bagian Pembuatan Versi, konfirmasi atau ubah
Jenis dan Versi Image. Versi image cluster menentukan versi komponen Flink yang diinstal di cluster.
Klik panel Sesuaikan cluster (opsional).
Di bagian Cluster properties, klik Add Properties untuk setiap properti cluster opsional yang ingin ditambahkan ke cluster Anda. Anda dapat menambahkan properti berawalan
flink
untuk mengonfigurasi properti Flink di/etc/flink/conf/flink-conf.yaml
yang akan bertindak sebagai default untuk aplikasi Flink yang Anda jalankan di cluster.Contoh:
- Tetapkan
flink:historyserver.archive.fs.dir
untuk menentukan lokasi Cloud Storage guna menulis file histori tugas Flink (lokasi ini akan digunakan oleh Server Histori Flink yang berjalan di cluster Flink). - Setel slot tugas Flink dengan
flink:taskmanager.numberOfTaskSlots=n
.
- Tetapkan
Di bagian Metadata cluster kustom, klik Tambahkan Metadata untuk menambahkan metadata opsional. Misalnya, tambahkan
flink-start-yarn-session
true
untuk menjalankan daemon Flink YARN (/usr/bin/flink-yarn-daemon
) di latar belakang pada node master cluster untuk memulai sesi Flink YARN (lihat Mode sesi Flink).
Jika Anda menggunakan image Dataproc versi 2.0 atau yang lebih lama, klik panel Manage security (optional), lalu di bagian Project access, pilih
Enables the cloud-platform scope for this cluster
. Cakupancloud-platform
diaktifkan secara default saat Anda membuat cluster yang menggunakan gambar Dataproc versi 2.1 atau yang lebih baru.
- Panel Siapkan cluster dipilih.
Klik Create untuk membuat cluster.
gcloud
Untuk membuat cluster Dataproc Flink menggunakan gcloud CLI, jalankan perintah gcloud dataproc clusters create berikut secara lokal di jendela terminal atau di Cloud Shell:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
Catatan:
- CLUSTER_NAME: Menentukan nama cluster.
- REGION: Menentukan region Compute Engine tempat cluster akan ditempatkan.
DATAPROC_IMAGE_VERSION: Secara opsional, menentukan versi image yang akan digunakan di cluster. Versi image cluster menentukan versi komponen Flink yang diinstal di cluster.
Versi image harus 1.5 atau lebih tinggi untuk mengaktifkan komponen Flink di cluster (Lihat Versi Dataproc yang didukung untuk melihat daftar versi komponen yang disertakan dalam setiap rilis image Dataproc).
Versi gambar harus [TBD] atau lebih tinggi untuk menjalankan tugas Flink melalui Dataproc Jobs API (lihat Menjalankan tugas Dataproc Flink).
--optional-components
: Anda harus menentukan komponenFLINK
untuk menjalankan tugas Flink dan Layanan Web Flink HistoryServer di cluster.--enable-component-gateway
: Anda harus mengaktifkan Gateway Komponen untuk mengaktifkan link Gateway Komponen ke UI Server Histori Flink. Jika Gateway Komponen diaktifkan, akses ke antarmuka web Flink Job Manager yang berjalan di cluster Flink juga akan diaktifkan.PROPERTIES. Secara opsional, tentukan satu atau beberapa properti cluster.
Saat membuat cluster Dataproc dengan versi image
2.0.67
+ dan2.1.15
+, Anda dapat menggunakan flag--properties
untuk mengonfigurasi properti Flink di/etc/flink/conf/flink-conf.yaml
yang akan berfungsi sebagai default untuk aplikasi Flink yang Anda jalankan di cluster.Anda dapat menyetel
flink:historyserver.archive.fs.dir
untuk menentukan lokasi Cloud Storage guna menulis file histori tugas Flink (lokasi ini akan digunakan oleh Server Histori Flink yang berjalan di cluster Flink).Contoh beberapa properti:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
Tanda lainnya:
- Anda dapat menambahkan flag
--metadata flink-start-yarn-session=true
opsional untuk menjalankan daemon Flink YARN (/usr/bin/flink-yarn-daemon
) di latar belakang pada node master cluster untuk memulai sesi Flink YARN (lihat Mode sesi Flink).
- Anda dapat menambahkan flag
Saat menggunakan versi image 2.0 atau yang lebih lama, Anda dapat menambahkan flag
--scopes=https://www.googleapis.com/auth/cloud-platform
untuk mengaktifkan akses ke Google Cloud API oleh cluster Anda (lihat Praktik terbaik cakupan). Cakupancloud-platform
diaktifkan secara default saat Anda membuat cluster yang menggunakan gambar Dataproc versi 2.1 atau yang lebih baru.
API
Untuk membuat cluster Dataproc Flink menggunakan Dataproc API, kirimkan permintaan clusters.create, sebagai berikut:
Catatan:
Tetapkan SoftwareConfig.Component ke
FLINK
.Anda juga dapat menyetel
SoftwareConfig.imageVersion
untuk menentukan versi gambar yang akan digunakan di cluster. Versi image cluster menentukan versi komponen Flink yang diinstal di cluster.Versi image harus 1.5 atau lebih tinggi untuk mengaktifkan komponen Flink di cluster (Lihat Versi Dataproc yang didukung untuk melihat daftar versi komponen yang disertakan dalam setiap rilis image Dataproc).
Versi gambar harus [TBD] atau lebih tinggi untuk menjalankan tugas Flink melalui Dataproc Jobs API (lihat Menjalankan tugas Dataproc Flink).
Tetapkan EndpointConfig.enableHttpPortAccess ke
true
untuk mengaktifkan link Gateway Komponen ke UI Server Histori Flink. Jika Gateway Komponen diaktifkan, akses ke antarmuka web Flink Job Manager yang berjalan di cluster Flink juga akan diaktifkan.Secara opsional, Anda dapat menyetel
SoftwareConfig.properties
untuk menentukan satu atau beberapa properti cluster.- Anda dapat menentukan properti Flink yang akan berfungsi sebagai default untuk aplikasi Flink yang dijalankan di cluster. Misalnya, Anda dapat menyetel
flink:historyserver.archive.fs.dir
untuk menentukan lokasi Cloud Storage guna menulis file histori tugas Flink (lokasi ini akan digunakan oleh Server Histori Flink yang berjalan di cluster Flink).
- Anda dapat menentukan properti Flink yang akan berfungsi sebagai default untuk aplikasi Flink yang dijalankan di cluster. Misalnya, Anda dapat menyetel
Anda dapat memilih untuk menetapkan:
GceClusterConfig.metadata
. misalnya, untuk menentukanflink-start-yarn-session
true
guna menjalankan daemon Flink YARN (/usr/bin/flink-yarn-daemon
) di latar belakang pada node master cluster untuk memulai sesi Flink YARN (lihat Mode sesi Flink).- GceClusterConfig.serviceAccountScopes
ke
https://www.googleapis.com/auth/cloud-platform
(cakupancloud-platform
) saat menggunakan versi image 2.0 atau yang lebih lama untuk mengaktifkan akses ke Google Cloud API berdasarkan cluster Anda (lihat Praktik terbaik cakupan). Cakupancloud-platform
diaktifkan secara default saat Anda membuat cluster yang menggunakan gambar Dataproc versi 2.1 atau yang lebih baru.
Setelah membuat cluster Flink
- Gunakan link
Flink History Server
di Gateway Komponen untuk melihat Server Histori Flink yang berjalan di cluster Flink. - Gunakan
YARN ResourceManager link
di Gateway Komponen untuk melihat antarmuka web Flink Job Manager yang berjalan di cluster Flink . - Buat Dataproc Persistent History Server untuk melihat file histori tugas Flink yang ditulis oleh cluster Flink yang ada dan dihapus.
Menjalankan tugas Flink menggunakan resource Jobs
Dataproc
Anda dapat menjalankan tugas Flink menggunakan resource Jobs
Dataproc dari Konsol Google Cloud, Google Cloud CLI, atau Dataproc API.
Konsol
Untuk mengirimkan contoh tugas jumlah kata Flink dari konsol:
Buka halaman Kirim tugas Dataproc di Konsol Google Cloud pada browser Anda.
Isi kolom di halaman Kirim tugas:
- Pilih nama Cluster dari daftar cluster.
- Tetapkan Job type ke
Flink
. - Tetapkan Class utama atau jar ke
org.apache.flink.examples.java.wordcount.WordCount
. - Setel File Jar ke
file:///usr/lib/flink/examples/batch/WordCount.jar
.file:///
menunjukkan file yang berada di cluster. Dataproc menginstalWordCount.jar
saat membuat cluster Flink.- Kolom ini juga menerima jalur Cloud Storage (
gs://BUCKET/JARFILE
) atau jalur Hadoop Distributed File System (HDFS) (hdfs://PATH_TO_JAR
).
Klik Submit.
- Output pengemudi tugas ditampilkan di halaman Detail tugas.
- Tugas flink tercantum di halaman Jobs Dataproc di Konsol Google Cloud.
- Klik Berhenti atau Hapus dari halaman Tugas atau Detail tugas untuk menghentikan atau menghapus tugas.
gcloud
Untuk mengirimkan tugas Flink ke cluster Dataproc Flink, jalankan perintah gcloud dataproc jobs submit gcloud CLI secara lokal di jendela terminal atau di Cloud Shell.
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
Catatan:
- CLUSTER_NAME: Tentukan nama cluster Dataproc Flink yang akan dikirimi tugas.
- REGION: Menentukan region Compute Engine tempat cluster berada.
- MAIN_CLASS: Tentukan class
main
dari aplikasi Flink Anda, seperti:org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: Menentukan file jar aplikasi Flink. Anda dapat menentukan:
- File jar yang diinstal di cluster, menggunakan awalan
file:///`:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- File jar di Cloud Storage:
gs://BUCKET/JARFILE
- File jar di HDFS:
hdfs://PATH_TO_JAR
- File jar yang diinstal di cluster, menggunakan awalan
JOB_ARGS: Secara opsional, tambahkan argumen tugas setelah tanda hubung ganda (
--
).Setelah mengirimkan tugas, output driver tugas akan ditampilkan di terminal lokal atau Cloud Shell.
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
Bagian ini menunjukkan cara mengirimkan tugas Flink ke cluster Dataproc Flink menggunakan jobs.submit API Dataproc.
Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:
- PROJECT_ID: Project ID Google Cloud
- REGION: region cluster
- CLUSTER_NAME: Tentukan nama cluster Dataproc Flink yang akan dikirimi tugas
Metode HTTP dan URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
Meminta isi JSON:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:
Anda akan melihat respons JSON seperti berikut:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Tugas flink tercantum di halaman Jobs Dataproc di Konsol Google Cloud.
- Anda dapat mengklik Stop atau Delete dari halaman Jobs atau Job details di Konsol Google Cloud untuk menghentikan atau menghapus tugas.
Menjalankan tugas Flink menggunakan CLI flink
Daripada menjalankan tugas Flink menggunakan resource Jobs
Dataproc, Anda dapat menjalankan tugas Flink di node master cluster Flink menggunakan CLI flink
.
Bagian berikut menjelaskan berbagai cara menjalankan tugas CLI flink
di cluster Dataproc Flink.
SSH ke node master: Gunakan utilitas SSH untuk membuka jendela terminal di VM master cluster.
Setel classpath: Lakukan inisialisasi classpath Hadoop dari jendela terminal SSH pada VM master cluster Flink:
export HADOOP_CLASSPATH=$(hadoop classpath)
Menjalankan tugas Flink: Anda dapat menjalankan tugas Flink dalam berbagai mode deployment pada YARN: mode aplikasi, per tugas, dan sesi.
Mode aplikasi: Mode Aplikasi Flink didukung oleh gambar Dataproc versi 2.0 dan yang lebih baru. Mode ini mengeksekusi metode
main()
tugas pada YARN Job Manager. Cluster dinonaktifkan setelah tugas selesai.Contoh kiriman pekerjaan:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
Menampilkan daftar tugas yang berjalan:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
Membatalkan tugas yang sedang berjalan:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Mode per tugas: Mode Flink ini mengeksekusi metode
main()
tugas di sisi klien.Contoh kiriman pekerjaan:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
Mode sesi: Memulai sesi Flink YARN yang berjalan lama, lalu mengirim satu atau beberapa tugas ke sesi tersebut.
Mulai sesi: Anda dapat memulai sesi Flink dengan salah satu cara berikut:
Buat cluster Flink, dengan menambahkan flag
--metadata flink-start-yarn-session=true
ke perintahgcloud dataproc clusters create
(Lihat Membuat cluster Dataproc Flink). Dengan mengaktifkan flag ini, setelah cluster dibuat, Dataproc akan menjalankan/usr/bin/flink-yarn-daemon
untuk memulai sesi Flink pada cluster.ID aplikasi YARN sesi disimpan di
/tmp/.yarn-properties-${USER}
. Anda dapat menampilkan daftar ID dengan perintahyarn application -list
.Jalankan skrip Flink
yarn-session.sh
, yang sudah terinstal sebelumnya di VM master cluster, dengan setelan kustom:Contoh dengan setelan kustom:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
Jalankan skrip wrapper
/usr/bin/flink-yarn-daemon
Flink dengan setelan default:. /usr/bin/flink-yarn-daemon
Mengirim tugas ke sesi: Jalankan perintah berikut untuk mengirim tugas Flink ke sesi.
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: URL, termasuk host dan port, VM master Flink tempat tugas dieksekusi.
Hapus
http:// prefix
dari URL. URL ini tercantum dalam output perintah saat Anda memulai sesi Flink. Anda dapat menjalankan perintah berikut untuk mencantumkan URL ini di kolomTracking-URL
:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: URL, termasuk host dan port, VM master Flink tempat tugas dieksekusi.
Hapus
Menampilkan daftar tugas dalam sesi: Untuk mencantumkan tugas Flink dalam sebuah sesi, lakukan salah satu tindakan berikut:
Menjalankan
flink list
tanpa argumen. Perintah ini mencari ID aplikasi YARN sesi di/tmp/.yarn-properties-${USER}
.Dapatkan ID aplikasi YARN sesi dari
/tmp/.yarn-properties-${USER}
atau outputyarn application -list
, lalu jalankan<code>
daftar flink -yid YARN_APPLICATION_ID.Jalankan
flink list -m FLINK_MASTER_URL
.
Menghentikan sesi: Untuk menghentikan sesi, dapatkan ID aplikasi YARN sesi dari
/tmp/.yarn-properties-${USER}
atau outputyarn application -list
, lalu jalankan salah satu perintah berikut:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Menjalankan tugas Apache Beam di Flink
Anda dapat menjalankan tugas Apache Beam di
Dataproc menggunakan
FlinkRunner
.
Anda dapat menjalankan tugas Beam di Flink dengan cara berikut:
- Tugas Java Beam
- Pekerjaan Beam Portabel
Tugas Java Beam
Kemas tugas Beam Anda ke dalam file JAR. Sediakan file JAR yang dipaketkan dengan dependensi yang diperlukan untuk menjalankan tugas.
Contoh berikut menjalankan tugas Java Beam dari node master cluster Dataproc.
Buat cluster Dataproc dengan mengaktifkan komponen Flink.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: versi image cluster, yang menentukan versi Flink yang diinstal pada cluster (misalnya, lihat versi komponen Apache Flink yang tercantum untuk empat versi rilis image 2.0.x terbaru dan sebelumnya).--region
: region Dataproc yang didukung.--enable-component-gateway
: mengaktifkan akses ke UI Flink Job Manager.--scopes
: mengaktifkan akses ke Google Cloud API berdasarkan cluster Anda (lihat Praktik terbaik cakupan). Cakupancloud-platform
diaktifkan secara default (Anda tidak perlu menyertakan setelan flag ini) saat membuat cluster yang menggunakan gambar Dataproc versi 2.1 atau yang lebih baru.
Gunakan utilitas SSH untuk membuka jendela terminal pada node master cluster Flink.
Mulai sesi Flink YARN pada node master cluster Dataproc.
. /usr/bin/flink-yarn-daemon
Catat versi Flink di cluster Dataproc Anda.
flink --version
Di komputer lokal, buat contoh jumlah kata Beam kanonis di Java.
Pilih versi Beam yang kompatibel dengan versi Flink di cluster Dataproc Anda. Lihat tabel Kompatibilitas Versi Flink yang mencantumkan kompatibilitas versi Beam-Flink.
Buka file POM yang dihasilkan. Periksa versi runner Beam Flink yang ditentukan oleh tag
<flink.artifact.name>
. Jika versi runner Beam Flink di nama artefak Flink tidak cocok dengan versi Flink di cluster Anda, perbarui nomor versi agar cocok.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Memaketkan contoh jumlah kata.
mvn package -Pflink-runner
Upload file JAR uber yang dipaketkan,
word-count-beam-bundled-0.1.jar
(~135 MB) ke node master cluster Dataproc Anda. Anda dapat menggunakangsutil cp
untuk transfer file yang lebih cepat ke cluster Dataproc dari Cloud Storage.Di terminal lokal Anda, buat bucket Cloud Storage, lalu upload JAR uber.
gsutil mb BUCKET_NAME
gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Pada node master Dataproc, download JAR uber.
gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Jalankan tugas Java Beam pada node master cluster Dataproc.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Periksa apakah hasilnya sudah ditulis ke bucket Cloud Storage Anda.
gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Hentikan sesi Flink YARN.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Pekerjaan Balok Portabel
Untuk menjalankan tugas Beam yang ditulis dalam Python, Go, dan bahasa lain yang didukung, Anda dapat menggunakan FlinkRunner
dan PortableRunner
seperti yang dijelaskan di halaman Flink Runner Beam (lihat juga Roadmap Framework Portabilitas).
Contoh berikut menjalankan tugas Beam portabel di Python dari node master cluster Dataproc.
Buat cluster Dataproc dengan mengaktifkan komponen Flink dan Docker.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Catatan:
--optional-components
: Flink dan Docker.--image-version
: Versi image cluster, yang menentukan versi Flink yang diinstal pada cluster (misalnya, lihat versi komponen Apache Flink yang tercantum untuk empat versi rilis image 2.0.x terbaru dan sebelumnya.--region
: Region Dataproc yang tersedia.--enable-component-gateway
: Mengaktifkan akses ke UI Flink Job Manager.--scopes
: Mengaktifkan akses ke Google Cloud API berdasarkan cluster Anda (lihat Praktik terbaik cakupan). Cakupancloud-platform
diaktifkan secara default (Anda tidak perlu menyertakan setelan flag ini) saat membuat cluster yang menggunakan gambar Dataproc versi 2.1 atau yang lebih baru.
Gunakan
gsutil
secara lokal atau di Cloud Shell untuk membuat bucket Cloud Storage. Anda akan menentukan BUCKET_NAME saat menjalankan contoh program jumlah kata.gsutil mb BUCKET_NAME
Di jendela terminal pada VM cluster, mulai sesi Flink YARN. Catat URL master Flink, alamat master Flink tempat tugas dijalankan. Anda akan menentukan FLINK_MASTER_URL saat menjalankan contoh program jumlah kata.
. /usr/bin/flink-yarn-daemon
Tampilkan dan perhatikan versi Flink yang menjalankan cluster Dataproc. Anda akan menentukan FLINK_VERSION saat menjalankan contoh program jumlah kata.
flink --version
Instal library Python yang diperlukan untuk tugas di node master cluster.
Instal Versi Beam yang kompatibel dengan versi Flink di cluster.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Jalankan contoh jumlah kata di node master cluster.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Catatan:
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, seperti yang disebutkan sebelumnya.--flink_master
: FLINK_MASTER_URL, seperti yang disebutkan sebelumnya.--flink_submit_uber_jar
: Menggunakan JAR uber untuk menjalankan tugas Beam.--output
: BUCKET_NAME, dibuat sebelumnya.
Verifikasi bahwa hasil telah ditulis ke bucket Anda.
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Hentikan sesi Flink YARN.
- Dapatkan ID aplikasi.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Jalankan Flink di cluster Kerberized
Komponen Dataproc Flink mendukung Cluster Kerberized. Tiket Kerberos yang valid diperlukan untuk mengirimkan dan mempertahankan tugas Flink atau memulai cluster Flink. Secara default, tiket Kerberos tetap berlaku selama tujuh hari.
Mengakses UI Flink Job Manager
Antarmuka web Flink Job Manager tersedia saat tugas Flink atau cluster sesi Flink sedang berjalan. Untuk menggunakan antarmuka web:
- Buat cluster Dataproc Flink.
- Setelah pembuatan cluster, klik Gateway Komponen link YARN ResourceManager di tab Antarmuka Web pada halaman Detail cluster di Konsol Google Cloud.
- Di UI YARN Resource Manager, identifikasi entri aplikasi cluster Flink. Bergantung pada status penyelesaian tugas, link ApplicationMaster atau History akan dicantumkan.
- Untuk tugas streaming yang berjalan lama, klik link ApplicationManager untuk membuka dasbor Flink; untuk tugas yang sudah selesai, klik link History untuk melihat detail tugas.