當您使用選用元件功能建立 Dataproc 叢集時,可以啟用 Flink 等其他元件。本頁說明如何啟用 Apache Flink 選用元件 (Flink 叢集),然後建立 Dataproc 叢集,並在叢集中執行 Flink 工作。
您可以使用 Flink 叢集來執行以下操作:
使用 Dataproc
Jobs
資源,透過 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 執行 Flink 工作。使用
flink
CLI 執行 Flink 工作,該 CLI 會在 Flink 叢集主節點上執行。
建立 Dataproc Flink 叢集
您可以使用 Google Cloud 主控台、Google Cloud CLI 或 Dataproc API,建立在叢集中啟用 Flink 元件的 Dataproc 叢集。
建議:使用含有 Flink 元件的標準 1 個主控台 VM 叢集。Dataproc 高可用性模式叢集 (含 3 個主要 VM) 不支援 Flink 高可用性模式。
控制台
如要使用 Google Cloud 控制台建立 Dataproc Flink 叢集,請執行下列步驟:
開啟 Dataproc「在 Compute Engine 中建立 Dataproc 叢集」頁面。
- 選取「設定叢集」面板。
- 在「版本控制」部分中,確認或變更「圖片類型和版本」。叢集映像檔版本會決定叢集中安裝的 Flink 元件版本。
- 映像檔版本必須為 1.5 以上,才能在叢集中啟用 Flink 元件 (請參閱「支援的 Dataproc 版本,查看每個 Dataproc 映像檔版本中包含的元件版本清單)。
- 如要透過 Dataproc Jobs API 執行 Flink 工作,映像檔版本必須為 [TBD] 以上版本 (請參閱「執行 Dataproc Flink 工作」)。
- 在「Components」部分中:
- 在「元件閘道」下方,選取「啟用元件閘道」。您必須啟用元件閘道,才能啟用連結至 Flink 記錄伺服器 UI 的元件閘道。啟用元件閘道後,您還能存取在 Flink 叢集中執行的 Flink Job Manager 網路介面。
- 在「選用元件」下方,選取要在叢集中啟用的 Flink 和其他選用元件。
- 在「版本控制」部分中,確認或變更「圖片類型和版本」。叢集映像檔版本會決定叢集中安裝的 Flink 元件版本。
按一下「Customize cluster (optional)」(自訂叢集) 面板。
在「叢集屬性」部分,按一下每個選用的「叢集屬性」,將其新增至叢集。您可以新增
flink
前置字元的屬性,在/etc/flink/conf/flink-conf.yaml
中設定 Flink 屬性,這些屬性會做為叢集中執行的 Flink 應用程式預設值。範例:
- 設定
flink:historyserver.archive.fs.dir
以指定 Cloud Storage 位置,用於寫入 Flink 工作記錄檔案 (這個位置會由在 Flink 叢集中執行的 Flink 記錄伺服器使用)。 - 使用
flink:taskmanager.numberOfTaskSlots=n
設定 Flink 工作時段。
- 設定
在「Custom cluster metadata」部分,按一下「Add Metadata」,即可新增選用中繼資料。舉例來說,您可以新增
flink-start-yarn-session
true
,在叢集主節點的背景中執行 Flink YARN 守護程序 (/usr/bin/flink-yarn-daemon
),以便啟動 Flink YARN 工作階段 (請參閱「Flink 工作階段模式」)。
如果您使用的是 Dataproc 映像檔 2.0 以下版本,請按一下「管理安全性 (選用)」面板,然後在「專案存取權」下方選取
Enables the cloud-platform scope for this cluster
。建立使用 Dataproc 映像檔 2.1 以上版本的叢集時,系統預設會啟用cloud-platform
範圍。
- 選取「設定叢集」面板。
點選「建立」來建立叢集。
gcloud
如要使用 gcloud CLI 建立 Dataproc Flink 叢集,請在本機的終端機視窗或 Cloud Shell 中執行下列 gcloud dataproc clusters create 指令:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
注意:
- CLUSTER_NAME:指定叢集名稱。
- REGION:指定叢集所在的 Compute Engine 區域。
DATAPROC_IMAGE_VERSION:您可以選擇指定要用於叢集的圖片版本。叢集映像檔版本會決定叢集中安裝的 Flink 元件版本。
映像檔版本必須為 1.5 以上,才能在叢集中啟用 Flink 元件 (請參閱「支援的 Dataproc 版本」,查看每個 Dataproc 映像檔版本中包含的元件版本清單)。
如要透過 Dataproc Jobs API 執行 Flink 工作,映像檔版本必須為 [TBD] 以上版本 (請參閱「執行 Dataproc Flink 工作」)。
--optional-components
:您必須指定FLINK
元件,才能在叢集上執行 Flink 工作和 Flink HistoryServer Web Service。--enable-component-gateway
:您必須啟用元件閘道,才能啟用元件閘道連結至 Flink 歷史記錄伺服器 UI。啟用元件閘道後,您還能存取在 Flink 叢集中執行的 Flink 工作管理員網頁介面。PROPERTIES:視需要指定一或多個叢集屬性。
使用映像檔版本
2.0.67
+ 和2.1.15
+ 建立 Dataproc 叢集時,您可以使用--properties
標記在/etc/flink/conf/flink-conf.yaml
中設定 Flink 屬性,這些屬性會做為您在叢集中執行的 Flink 應用程式預設值。您可以設定
flink:historyserver.archive.fs.dir
,指定 Cloud Storage 位置來寫入 Flink 工作記錄檔案 (這個位置會由在 Flink 叢集中執行的 Flink 記錄伺服器使用)。多個資源範例:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
其他旗標:
- 您可以新增選用的
--metadata flink-start-yarn-session=true
標記,在叢集主節點的背景中執行 Flink YARN 守護程序 (/usr/bin/flink-yarn-daemon
),以啟動 Flink YARN 工作階段 (請參閱「Flink 工作階段模式」)。
- 您可以新增選用的
使用 2.0 以下版本的映像檔時,您可以新增
--scopes=https://www.googleapis.com/auth/cloud-platform
標記,讓叢集能夠存取 Google Cloud API (請參閱範圍最佳做法)。建立使用 Dataproc 映像檔 2.1 以上版本的叢集時,系統預設會啟用cloud-platform
範圍。
API
如要使用 Dataproc API 建立 Dataproc Flink 叢集,請提交 clusters.create 要求,如下所示:
注意:
將 SoftwareConfig.Component 設為
FLINK
。您可以視需要設定
SoftwareConfig.imageVersion
,指定叢集中要使用的映像檔版本。叢集映像檔版本會決定叢集中安裝的 Flink 元件版本。映像檔版本必須為 1.5 以上,才能在叢集中啟用 Flink 元件 (請參閱「支援的 Dataproc 版本」,查看每個 Dataproc 映像檔版本中包含的元件版本清單)。
如要透過 Dataproc Jobs API 執行 Flink 工作,映像檔版本必須為 [TBD] 以上版本 (請參閱「執行 Dataproc Flink 工作」)。
將 EndpointConfig.enableHttpPortAccess 設為
true
,即可啟用元件閘道連結至 Flink 記錄伺服器 UI。啟用元件閘道後,您還能存取在 Flink 叢集中執行的 Flink 工作管理員網頁介面。您可以選擇設定
SoftwareConfig.properties
,指定一或多個叢集屬性。- 您可以指定 Flink 屬性,做為在叢集中執行的 Flink 應用程式預設值。舉例來說,您可以設定
flink:historyserver.archive.fs.dir
,指定 Cloud Storage 位置來寫入 Flink 工作記錄檔案 (這個位置會由在 Flink 叢集中執行的 Flink 記錄伺服器使用)。
- 您可以指定 Flink 屬性,做為在叢集中執行的 Flink 應用程式預設值。舉例來說,您可以設定
您可以選擇設定下列項目:
GceClusterConfig.metadata
。舉例來說,如要指定flink-start-yarn-session
true
,以便在叢集主節點的背景中執行 Flink YARN 守護程序 (/usr/bin/flink-yarn-daemon
),以便啟動 Flink YARN 工作階段 (請參閱「Flink 工作階段模式」)。- 使用 2.0 以下版本的映像檔時,將 GceClusterConfig.serviceAccountScopes 設為
https://www.googleapis.com/auth/cloud-platform
(cloud-platform
範圍),讓叢集能夠存取 Google CloudAPI (請參閱範圍最佳做法)。建立使用 Dataproc 映像檔 2.1 以上版本的叢集時,系統預設會啟用cloud-platform
範圍。
建立 Flink 叢集後
- 使用 元件閘道中的
Flink History Server
連結,查看在 Flink 叢集中執行的 Flink 記錄伺服器。 - 使用元件閘道中的
YARN ResourceManager link
,即可查看在 Flink 叢集中執行的 Flink Job Manager 網路介面。 - 建立 Dataproc 永久記錄伺服器,即可查看由現有和已刪除的 Flink 叢集寫入的 Flink 工作記錄檔案。
使用 Dataproc Jobs
資源執行 Flink 工作
您可以使用 Dataproc Jobs
資源,透過Google Cloud 主控台、Google Cloud CLI 或 Dataproc API 執行 Flink 工作。
控制台
如要透過控制台提交 Flink 字數統計範例工作,請按照下列步驟操作:
填寫「Submit a job」(提交工作) 頁面中的欄位:
- 從叢集清單選取您的「Cluster」(叢集) 名稱。
- 將「Job type」(工作類型) 設為
Flink
。 - 將「Main class or jar」(主要類別或 jar) 設為
org.apache.flink.examples.java.wordcount.WordCount
。 - 將「Jar files」設為
file:///usr/lib/flink/examples/batch/WordCount.jar
。file:///
代表位於叢集中的檔案。Dataproc 在建立 Flink 叢集時安裝了WordCount.jar
。- 這個欄位也接受 Cloud Storage 路徑 (
gs://BUCKET/JARFILE
) 或 Hadoop 分散式檔案系統 (HDFS) 路徑 (hdfs://PATH_TO_JAR
)。
按一下「提交」。
- 工作驅動程式輸出內容會顯示在「Job details」頁面上。
- Flink 工作會列在 Google Cloud 主控台的 Dataproc「Jobs」頁面中。
- 在「工作」或「工作詳細資料」頁面中,按一下「停止」或「刪除」,即可停止或刪除工作。
gcloud
如要將 Flink 工作提交至 Dataproc Flink 叢集,請在本機終端機視窗或 Cloud Shell 中執行 gcloud CLI gcloud dataproc jobs submit 指令。
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
注意:
- CLUSTER_NAME:指定要提交工作的 Dataproc Flink 叢集名稱。
- REGION:指定叢集所在的 Compute Engine 區域。
- MAIN_CLASS:指定 Flink 應用程式的
main
類別,例如:org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE:指定 Flink 應用程式 Jar 檔案。您可以指定:
- 使用
file:///` 前置字串在叢集中安裝的 jar 檔案:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Cloud Storage 中的 JAR 檔案:
gs://BUCKET/JARFILE
- HDFS 中的 jar 檔案:
hdfs://PATH_TO_JAR
- 使用
JOB_ARGS:您可以選擇在雙連字號 (
--
) 後方新增工作引數。提交工作後,本機或 Cloud Shell 終端機會顯示工作驅動程式輸出內容。
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
本節說明如何使用 Dataproc jobs.submit API,將 Flink 工作提交至 Dataproc Flink 叢集。
使用任何要求資料之前,請先替換以下項目:
- PROJECT_ID: Google Cloud 專案 ID
- REGION:叢集區域
- CLUSTER_NAME:指定要將工作提交至哪個 Dataproc Flink 叢集
HTTP 方法和網址:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
JSON 要求主體:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
如要傳送要求,請展開以下其中一個選項:
您應該會收到如下的 JSON 回應:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Flink 工作會列在 Google Cloud 主控台的 Dataproc「Jobs」頁面中。
- 您可以從 Google Cloud 控制台的「Jobs」或「Job details」頁面,按一下「Stop」或「Delete」,停止或刪除工作。
使用 flink
CLI 執行 Flink 工作
您可以使用 flink
CLI 在 Flink 叢集的主節點上執行 Flink 工作,不必使用 Dataproc Jobs
資源。
以下各節將說明在 Dataproc Flink 叢集中執行 flink
CLI 工作的方法。
透過 SSH 連線至主要節點:使用 SSH 公用程式,在叢集主要 VM 上開啟終端機視窗。
設定類別路徑:透過 Flink 叢集主 VM 的 SSH 終端機視窗,初始化 Hadoop 類別路徑:
export HADOOP_CLASSPATH=$(hadoop classpath)
執行 Flink 工作:您可以在不同的YARN 部署模式中執行 Flink 工作,包括應用程式、個別工作和工作階段模式。
應用程式模式:Dataproc 2.0 以上版本支援 Flink 應用程式模式。這個模式會在 YARN 工作管理員上執行工作
main()
方法。叢集會在工作完成後關閉。工作提交範例:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
列出執行中的工作:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
取消正在執行的工作:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
個別工作模式:這個 Flink 模式會在用戶端執行工作的
main()
方法。工作提交範例:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
工作階段模式:啟動長時間執行的 Flink YARN 工作階段,然後將一或多個工作提交至工作階段。
啟動工作階段:您可以透過下列任一方式啟動 Flink 工作階段:
建立 Flink 叢集,將
--metadata flink-start-yarn-session=true
標記新增至gcloud dataproc clusters create
指令 (請參閱「建立 Dataproc Flink 叢集」)。啟用這個標記後,叢集建立完成後,Dataproc 就會執行/usr/bin/flink-yarn-daemon
,在叢集中啟動 Flink 工作階段。工作階段的 YARN 應用程式 ID 會儲存在
/tmp/.yarn-properties-${USER}
中。您可以使用yarn application -list
指令列出 ID。執行 Flink
yarn-session.sh
指令碼,該指令碼已預先安裝在叢集主 VM 上,並使用自訂設定:自訂設定範例:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
使用預設設定執行 Flink 的
/usr/bin/flink-yarn-daemon
包裝函式指令碼:. /usr/bin/flink-yarn-daemon
將工作提交至工作階段:執行下列指令,將 Flink 工作提交至工作階段。
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL:執行工作的 Flink 主 VM 的網址,包括主機和通訊埠。從網址中移除
http:// prefix
。啟動 Flink 工作階段時,這個網址會列在指令輸出內容中。您可以執行下列指令,在Tracking-URL
欄位中列出這個網址:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL:執行工作的 Flink 主 VM 的網址,包括主機和通訊埠。從網址中移除
在工作階段中列出工作:如要在工作階段中列出 Flink 工作,請執行下列任一操作:
不含引數的
flink list
執行作業。這個指令會在/tmp/.yarn-properties-${USER}
中尋找工作階段的 YARN 應用程式 ID。從
/tmp/.yarn-properties-${USER}
或yarn application -list
的輸出內容取得工作階段的 YARN 應用程式 ID,然後執行<code>
flink list -yid YARN_APPLICATION_ID。執行
flink list -m FLINK_MASTER_URL
。
停止工作階段:如要停止工作階段,請從
/tmp/.yarn-properties-${USER}
或yarn application -list
的輸出內容取得工作階段的 YARN 應用程式 ID,然後執行下列任一指令:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
在 Flink 上執行 Apache Beam 工作
您可以使用 FlinkRunner
在 Dataproc 上執行 Apache Beam 工作。
您可以透過下列方式在 Flink 上執行 Beam 工作:
- Java Beam 工作
- 可攜式 Beam 工作
Java Beam 工作
將 Beam 工作封裝至 JAR 檔案。提供已套件的 JAR 檔案,其中包含執行工作所需的依附元件。
以下範例會從 Dataproc 叢集的主要節點執行 Java Beam 工作。
建立已啟用 Flink 元件 的 Dataproc 叢集。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
:Flink。--image-version
:叢集的映像檔版本,可決定叢集中安裝的 Flink 版本 (例如,請參閱最新和前四個 2.0.x 映像檔版本中列出的 Apache Flink 元件版本)。--region
:支援的 Dataproc 區域。--enable-component-gateway
:啟用 Flink Job Manager UI 存取權。--scopes
:讓叢集 Google Cloud 存取 API (請參閱範圍最佳做法)。建立使用 Dataproc 映像檔 2.1 以上版本的叢集時,系統會預設啟用cloud-platform
範圍 (您不需要加入這個標記設定)。
使用 SSH 公用程式,在 Flink 叢集主要節點上開啟終端機視窗。
在 Dataproc 叢集主要節點上啟動 Flink YARN 工作階段。
. /usr/bin/flink-yarn-daemon
請記下 Dataproc 叢集中的 Flink 版本。
flink --version
在本機電腦上使用 Java 產生標準的 Beam 字數計數範例。
請選擇與 Dataproc 叢集中的 Flink 版本相容的 Beam 版本。請參閱「Flink 版本相容性 」表格,其中列出 Beam-Flink 版本相容性。
開啟產生的 POM 檔案。請檢查標記
<flink.artifact.name>
所指定的 Beam Flink 執行程版本。如果 Flink 成果名稱中的 Beam Flink 執行器版本與叢集中的 Flink 版本不符,請更新版本號碼以便符合。mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
封裝字數範例。
mvn package -Pflink-runner
將已封裝的 uber JAR 檔案
word-count-beam-bundled-0.1.jar
(~135 MB) 上傳至 Dataproc 叢集的主節點。您可以使用gcloud storage cp
,加快從 Cloud Storage 傳輸至 Dataproc 叢集中的檔案。在本機終端機上建立 Cloud Storage 值區,然後上傳 uber JAR。
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
在 Dataproc 的主要節點上,下載 uber JAR。
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
在 Dataproc 叢集的主要節點上執行 Java Beam 工作。
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
確認結果是否已寫入 Cloud Storage 值區。
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
停止 Flink YARN 工作階段。
yarn application -list
yarn application -kill YARN_APPLICATION_ID
可攜式 Beam 工作
如要執行以 Python、Go 和其他支援語言編寫的 Beam 工作,您可以使用 Beam 的 Flink Runner 頁面所述的 FlinkRunner
和 PortableRunner
(請參閱可攜式架構路線圖)。
以下範例會透過 Dataproc 叢集的主要節點,在 Python 中執行可攜式 Beam 工作。
建立 Dataproc 叢集,並啟用 Flink 和 Docker 元件。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
注意:
--optional-components
:Flink 和 Docker。--image-version
:叢集的映像檔版本,用於決定叢集中安裝的 Flink 版本 (例如,請參閱最新和前四個 2.0.x 映像檔版本中列出的 Apache Flink 元件版本)。--region
:可用的 Dataproc 區域。--enable-component-gateway
:啟用 Flink 工作管理員 UI 的存取權。--scopes
:啟用叢集對 Google Cloud API 的存取權 (請參閱範圍最佳做法)。建立使用 Dataproc 映像檔 2.1 以上版本的叢集時,系統會預設啟用cloud-platform
範圍 (您不需要加入這個標記設定)。
在本機或 Cloud Shell 中使用 gcloud CLI 建立 Cloud Storage bucket。您會在執行範例字數計算程式時指定 BUCKET_NAME。
gcloud storage buckets create BUCKET_NAME
在叢集 VM 的終端機視窗中,啟動 Flink YARN 工作階段。請記下 Flink 主網址,這是執行工作時的 Flink 主機位址。執行範例字數計數程式時,您會指定 FLINK_MASTER_URL。
. /usr/bin/flink-yarn-daemon
顯示並記下執行 Dataproc 叢集的 Flink 版本。執行範例字數計數程式時,您會指定 FLINK_VERSION。
flink --version
在叢集主節點上安裝工作所需的 Python 程式庫。
安裝與叢集中 Flink 版本相容的Beam 版本。
python -m pip install apache-beam[gcp]==BEAM_VERSION
在叢集主節點上執行字數範例。
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
注意:
--runner
:FlinkRunner
。--flink_version
:FLINK_VERSION,如前所述。--flink_master
:FLINK_MASTER_URL,如前所述。--flink_submit_uber_jar
:使用 uber JAR 執行 Beam 工作。--output
:先前建立的 BUCKET_NAME。
確認結果是否已寫入值區。
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
停止 Flink YARN 工作階段。
- 取得應用程式 ID。
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
在使用 Kerber 的叢集上執行 Flink
Dataproc Flink 元件支援 Kerberos 叢集。如要提交及儲存 Flink 工作,或啟動 Flink 叢集,就必須使用有效的 Kerberos 票證。根據預設,Kerberos 票證的有效期限為七天。
存取 Flink 工作管理工具使用者介面
在 Flink 工作或 Flink 工作階段叢集執行時,您可以使用 Flink Job Manager 網頁介面。如何使用網頁介面:
- 建立 Dataproc Flink 叢集。
- 建立叢集後,請在 Google Cloud 主控台的「Cluster details」(叢集詳細資料) 頁面中,按一下「Web Interface」(網頁介面) 分頁標籤上的「Component Gateway」(元件閘道) YARN ResourceManager 連結。
- 在 YARN Resource Manager UI 中,找出 Flink 叢集應用程式項目。視工作完成狀態而定,系統會列出 ApplicationMaster 或 History 連結。
- 如要查看長時間執行的串流工作,請按一下「ApplicationManager」連結,開啟 Flink 資訊主頁;如要查看已完成的工作,請按一下「History」連結,查看工作詳細資料。