Dataproc 選用 Flink 元件

當您使用選用元件功能建立 Dataproc 叢集時,可以啟用 Flink 等其他元件。本頁說明如何啟用 Apache Flink 選用元件 (Flink 叢集),然後建立 Dataproc 叢集,並在叢集中執行 Flink 工作。

您可以使用 Flink 叢集來執行以下操作:

  1. 使用 Dataproc Jobs 資源,透過 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 執行 Flink 工作

  2. 使用 flink CLI 執行 Flink 工作,該 CLI 會在 Flink 叢集主節點上執行。

  3. 在 Flink 上執行 Apache Beam 工作

  4. 在使用 Kerber 的叢集上執行 Flink

您可以使用 Google Cloud 主控台、Google Cloud CLI 或 Dataproc API,建立在叢集中啟用 Flink 元件的 Dataproc 叢集。

建議:使用含有 Flink 元件的標準 1 個主控台 VM 叢集。Dataproc 高可用性模式叢集 (含 3 個主要 VM) 不支援 Flink 高可用性模式

  • 使用 元件閘道中的 Flink History Server 連結,查看在 Flink 叢集中執行的 Flink 記錄伺服器。
  • 使用元件閘道中的 YARN ResourceManager link,即可查看在 Flink 叢集中執行的 Flink Job Manager 網路介面
  • 建立 Dataproc 永久記錄伺服器,即可查看由現有和已刪除的 Flink 叢集寫入的 Flink 工作記錄檔案。

您可以使用 Dataproc Jobs 資源,透過Google Cloud 主控台、Google Cloud CLI 或 Dataproc API 執行 Flink 工作。

控制台

如要透過控制台提交 Flink 字數統計範例工作,請按照下列步驟操作:

  1. 在瀏覽器中,前往Google Cloud 控制台的 Dataproc「Submit a job」(提交工作) 頁面。

  2. 填寫「Submit a job」(提交工作) 頁面中的欄位:

    1. 從叢集清單選取您的「Cluster」(叢集) 名稱
    2. 將「Job type」(工作類型) 設為 Flink
    3. 將「Main class or jar」(主要類別或 jar) 設為 org.apache.flink.examples.java.wordcount.WordCount
    4. 將「Jar files」設為 file:///usr/lib/flink/examples/batch/WordCount.jar
      • file:/// 代表位於叢集中的檔案。Dataproc 在建立 Flink 叢集時安裝了 WordCount.jar
      • 這個欄位也接受 Cloud Storage 路徑 (gs://BUCKET/JARFILE) 或 Hadoop 分散式檔案系統 (HDFS) 路徑 (hdfs://PATH_TO_JAR)。
  3. 按一下「提交」

    • 工作驅動程式輸出內容會顯示在「Job details」頁面上。
    • Flink 工作會列在 Google Cloud 主控台的 Dataproc「Jobs頁面中。
    • 在「工作」或「工作詳細資料」頁面中,按一下「停止」或「刪除」,即可停止或刪除工作。

gcloud

如要將 Flink 工作提交至 Dataproc Flink 叢集,請在本機終端機視窗或 Cloud Shell 中執行 gcloud CLI gcloud dataproc jobs submit 指令。

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

注意:

  • CLUSTER_NAME:指定要提交工作的 Dataproc Flink 叢集名稱。
  • REGION:指定叢集所在的 Compute Engine 區域
  • MAIN_CLASS:指定 Flink 應用程式的 main 類別,例如:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE:指定 Flink 應用程式 Jar 檔案。您可以指定:
    • 使用 file:///` 前置字串在叢集中安裝的 jar 檔案:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Cloud Storage 中的 JAR 檔案: gs://BUCKET/JARFILE
    • HDFS 中的 jar 檔案: hdfs://PATH_TO_JAR
  • JOB_ARGS:您可以選擇在雙連字號 (--) 後方新增工作引數。

  • 提交工作後,本機或 Cloud Shell 終端機會顯示工作驅動程式輸出內容。

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

本節說明如何使用 Dataproc jobs.submit API,將 Flink 工作提交至 Dataproc Flink 叢集。

使用任何要求資料之前,請先替換以下項目:

  • PROJECT_ID: Google Cloud 專案 ID
  • REGION叢集區域
  • CLUSTER_NAME:指定要將工作提交至哪個 Dataproc Flink 叢集

HTTP 方法和網址:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

JSON 要求主體:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

如要傳送要求,請展開以下其中一個選項:

您應該會收到如下的 JSON 回應:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
Google Cloud Google Cloud
  • Flink 工作會列在 Google Cloud 主控台的 Dataproc「Jobs頁面中。
  • 您可以從 Google Cloud 控制台的「Jobs」或「Job details」頁面,按一下「Stop」或「Delete」,停止或刪除工作。

您可以使用 flink CLI 在 Flink 叢集的主節點上執行 Flink 工作,不必使用 Dataproc Jobs 資源

以下各節將說明在 Dataproc Flink 叢集中執行 flink CLI 工作的方法。

  1. 透過 SSH 連線至主要節點:使用 SSH 公用程式,在叢集主要 VM 上開啟終端機視窗。

  2. 設定類別路徑:透過 Flink 叢集主 VM 的 SSH 終端機視窗,初始化 Hadoop 類別路徑:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. 執行 Flink 工作:您可以在不同的YARN 部署模式中執行 Flink 工作,包括應用程式、個別工作和工作階段模式。

    1. 應用程式模式:Dataproc 2.0 以上版本支援 Flink 應用程式模式。這個模式會在 YARN 工作管理員上執行工作 main() 方法。叢集會在工作完成後關閉。

      工作提交範例:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      列出執行中的工作:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      取消正在執行的工作:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. 個別工作模式:這個 Flink 模式會在用戶端執行工作的 main() 方法。

      工作提交範例:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. 工作階段模式:啟動長時間執行的 Flink YARN 工作階段,然後將一或多個工作提交至工作階段。

      1. 啟動工作階段:您可以透過下列任一方式啟動 Flink 工作階段:

        1. 建立 Flink 叢集,將 --metadata flink-start-yarn-session=true 標記新增至 gcloud dataproc clusters create 指令 (請參閱「建立 Dataproc Flink 叢集」)。啟用這個標記後,叢集建立完成後,Dataproc 就會執行 /usr/bin/flink-yarn-daemon,在叢集中啟動 Flink 工作階段。

          工作階段的 YARN 應用程式 ID 會儲存在 /tmp/.yarn-properties-${USER} 中。您可以使用 yarn application -list 指令列出 ID。

        2. 執行 Flink yarn-session.sh 指令碼,該指令碼已預先安裝在叢集主 VM 上,並使用自訂設定:

          自訂設定範例:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. 使用預設設定執行 Flink 的 /usr/bin/flink-yarn-daemon 包裝函式指令碼:

          . /usr/bin/flink-yarn-daemon
          
      2. 將工作提交至工作階段:執行下列指令,將 Flink 工作提交至工作階段。

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL:執行工作的 Flink 主 VM 的網址,包括主機和通訊埠。從網址中移除 http:// prefix啟動 Flink 工作階段時,這個網址會列在指令輸出內容中。您可以執行下列指令,在 Tracking-URL 欄位中列出這個網址:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. 在工作階段中列出工作:如要在工作階段中列出 Flink 工作,請執行下列任一操作:

        • 不含引數的 flink list 執行作業。這個指令會在 /tmp/.yarn-properties-${USER} 中尋找工作階段的 YARN 應用程式 ID。

        • /tmp/.yarn-properties-${USER}yarn application -list 的輸出內容取得工作階段的 YARN 應用程式 ID,然後執行 <code>flink list -yid YARN_APPLICATION_ID

        • 執行 flink list -m FLINK_MASTER_URL

      4. 停止工作階段:如要停止工作階段,請從 /tmp/.yarn-properties-${USER}yarn application -list 的輸出內容取得工作階段的 YARN 應用程式 ID,然後執行下列任一指令:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

您可以使用 FlinkRunner 在 Dataproc 上執行 Apache Beam 工作。

您可以透過下列方式在 Flink 上執行 Beam 工作:

  1. Java Beam 工作
  2. 可攜式 Beam 工作

Java Beam 工作

將 Beam 工作封裝至 JAR 檔案。提供已套件的 JAR 檔案,其中包含執行工作所需的依附元件。

以下範例會從 Dataproc 叢集的主要節點執行 Java Beam 工作。

  1. 建立已啟用 Flink 元件 的 Dataproc 叢集。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components:Flink。
    • --image-version叢集的映像檔版本,可決定叢集中安裝的 Flink 版本 (例如,請參閱最新和前四個 2.0.x 映像檔版本中列出的 Apache Flink 元件版本)。
    • --region:支援的 Dataproc 區域
    • --enable-component-gateway:啟用 Flink Job Manager UI 存取權。
    • --scopes:讓叢集 Google Cloud 存取 API (請參閱範圍最佳做法)。建立使用 Dataproc 映像檔 2.1 以上版本的叢集時,系統會預設啟用 cloud-platform 範圍 (您不需要加入這個標記設定)。
  2. 使用 SSH 公用程式,在 Flink 叢集主要節點上開啟終端機視窗。

  3. 在 Dataproc 叢集主要節點上啟動 Flink YARN 工作階段。

    . /usr/bin/flink-yarn-daemon
    

    請記下 Dataproc 叢集中的 Flink 版本。

    flink --version
    
  4. 在本機電腦上使用 Java 產生標準的 Beam 字數計數範例

    請選擇與 Dataproc 叢集中的 Flink 版本相容的 Beam 版本。請參閱「Flink 版本相容性 」表格,其中列出 Beam-Flink 版本相容性。

    開啟產生的 POM 檔案。請檢查標記 <flink.artifact.name> 所指定的 Beam Flink 執行程版本。如果 Flink 成果名稱中的 Beam Flink 執行器版本與叢集中的 Flink 版本不符,請更新版本號碼以便符合。

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. 封裝字數範例。

    mvn package -Pflink-runner
    
  6. 將已封裝的 uber JAR 檔案 word-count-beam-bundled-0.1.jar (~135 MB) 上傳至 Dataproc 叢集的主節點。您可以使用 gcloud storage cp,加快從 Cloud Storage 傳輸至 Dataproc 叢集中的檔案。

    1. 在本機終端機上建立 Cloud Storage 值區,然後上傳 uber JAR。

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. 在 Dataproc 的主要節點上,下載 uber JAR。

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. 在 Dataproc 叢集的主要節點上執行 Java Beam 工作。

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. 確認結果是否已寫入 Cloud Storage 值區。

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. 停止 Flink YARN 工作階段。

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

可攜式 Beam 工作

如要執行以 Python、Go 和其他支援語言編寫的 Beam 工作,您可以使用 Beam 的 Flink Runner 頁面所述的 FlinkRunnerPortableRunner (請參閱可攜式架構路線圖)。

以下範例會透過 Dataproc 叢集的主要節點,在 Python 中執行可攜式 Beam 工作。

  1. 建立 Dataproc 叢集,並啟用 FlinkDocker 元件。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    注意:

    • --optional-components:Flink 和 Docker。
    • --image-version叢集的映像檔版本,用於決定叢集中安裝的 Flink 版本 (例如,請參閱最新和前四個 2.0.x 映像檔版本中列出的 Apache Flink 元件版本)。
    • --region:可用的 Dataproc 區域
    • --enable-component-gateway:啟用 Flink 工作管理員 UI 的存取權。
    • --scopes:啟用叢集對 Google Cloud API 的存取權 (請參閱範圍最佳做法)。建立使用 Dataproc 映像檔 2.1 以上版本的叢集時,系統會預設啟用 cloud-platform 範圍 (您不需要加入這個標記設定)。
  2. 在本機或 Cloud Shell 中使用 gcloud CLI 建立 Cloud Storage bucket。您會在執行範例字數計算程式時指定 BUCKET_NAME

    gcloud storage buckets create BUCKET_NAME
    
  3. 在叢集 VM 的終端機視窗中,啟動 Flink YARN 工作階段。請記下 Flink 主網址,這是執行工作時的 Flink 主機位址。執行範例字數計數程式時,您會指定 FLINK_MASTER_URL

    . /usr/bin/flink-yarn-daemon
    

    顯示並記下執行 Dataproc 叢集的 Flink 版本。執行範例字數計數程式時,您會指定 FLINK_VERSION

    flink --version
    
  4. 在叢集主節點上安裝工作所需的 Python 程式庫。

  5. 安裝與叢集中 Flink 版本相容的Beam 版本

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. 在叢集主節點上執行字數範例。

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    注意:

    • --runnerFlinkRunner
    • --flink_versionFLINK_VERSION,如前所述。
    • --flink_masterFLINK_MASTER_URL,如前所述。
    • --flink_submit_uber_jar:使用 uber JAR 執行 Beam 工作。
    • --output:先前建立的 BUCKET_NAME
  7. 確認結果是否已寫入值區。

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. 停止 Flink YARN 工作階段。

    1. 取得應用程式 ID。
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Dataproc Flink 元件支援 Kerberos 叢集。如要提交及儲存 Flink 工作,或啟動 Flink 叢集,就必須使用有效的 Kerberos 票證。根據預設,Kerberos 票證的有效期限為七天。

在 Flink 工作或 Flink 工作階段叢集執行時,您可以使用 Flink Job Manager 網頁介面。如何使用網頁介面:

  1. 建立 Dataproc Flink 叢集
  2. 建立叢集後,請在 Google Cloud 主控台的「Cluster details」(叢集詳細資料) 頁面中,按一下「Web Interface」(網頁介面) 分頁標籤上的「Component Gateway」(元件閘道) YARN ResourceManager 連結
  3. YARN Resource Manager UI 中,找出 Flink 叢集應用程式項目。視工作完成狀態而定,系統會列出 ApplicationMasterHistory 連結。
  4. 如要查看長時間執行的串流工作,請按一下「ApplicationManager」連結,開啟 Flink 資訊主頁;如要查看已完成的工作,請按一下「History」連結,查看工作詳細資料。