在 Google Kubernetes Engine 上的 Dataproc 上執行 Spark 工作

事前準備

  1. 您必須建立標準 (非 Autopilot) Google Kubernetes Engine (GKE) 區域地區叢集,且叢集已啟用 Workload Identity

建立 Dataproc on GKE 虛擬叢集

系統會建立 Dataproc on GKE 虛擬叢集,做為 Dataproc 元件的部署平台。這是虛擬資源,與 Dataproc on Compute Engine 叢集不同,不包含個別的 Dataproc 主要和工作站 VM。

  • 建立 Dataproc on GKE 虛擬叢集時,Dataproc on GKE 會在 GKE 叢集中建立節點集區。

  • Dataproc on GKE 工作會以 Pod 的形式在這些節點集區上執行。節點集區和節點集區中 Pod 的排程由 GKE 管理。

  • 建立多個虛擬叢集。您可以在 GKE 叢集上建立及執行多個虛擬叢集,並在虛擬叢集之間共用節點集區,進而提升資源使用率。

    • 每個虛擬叢集:
      • 是使用個別屬性建立,包括 Spark 引擎版本和工作負載身分
      • 在 GKE 叢集的獨立 GKE 命名空間中隔離

控制台

  1. 在 Google Cloud 控制台中,前往 Dataproc 的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 按一下 [Create cluster] (建立叢集)

  3. 在「Create Dataproc cluster」(建立 Dataproc 叢集) 對話方塊中,按一下「Cluster on GKE」(GKE 上的叢集) 列中的「Create」(建立)

  4. 在「設定叢集」面板中:

    1. 在「叢集名稱」欄位中,輸入叢集的名稱。
    2. 在「Region」(區域) 清單中,選取 Dataproc on GKE 虛擬叢集的區域。這個地區必須與現有 GKE 叢集所在的地區相同 (您會在下一個項目中選取)。
    3. 在「Kubernetes Cluster」(Kubernetes 叢集) 欄位中,按一下「Browse」(瀏覽),選取現有 GKE 叢集所在的區域。
    4. 選用:在「Cloud Storage staging bucket」(Cloud Storage 暫存值區) 欄位中,按一下「Browse」(瀏覽) 即可選取現有的 Cloud Storage 值區。Dataproc on GKE 會在值區中暫存構件。如要讓 Dataproc on GKE 建立暫存值區,請忽略這個欄位。
  5. 在左側面板中,按一下「設定節點集區」,然後在「節點集區」面板中,按一下「新增集區」

    1. 如要重複使用現有的 Dataproc on GKE 節點集區:
      1. 按一下「重複使用現有的節點集區」
      2. 輸入現有節點集區的名稱,然後選取其「角色」。 至少須有一個節點集區具備 DEFAULT 角色。
      3. 按一下 [完成]
    2. 如要建立新的 Dataproc on GKE 節點集區,請按照下列步驟操作:
      1. 按一下「建立新的節點集區」
      2. 輸入下列節點集區值:
    3. 按一下「新增集區」即可新增更多節點集區。所有節點集區都必須有位置。最多可以新增四個節點集區。
  6. (選用) 如果您已設定 Dataproc 持續性記錄伺服器 (PHS),可查看 GKE 上現有和已刪除 Dataproc 叢集的 Spark 工作記錄,請按一下「自訂叢集」。然後在「記錄伺服器叢集」欄位中,瀏覽並選擇 PHS 叢集。PHS 叢集必須與 Dataproc on GKE 虛擬叢集位於相同區域。

  7. 按一下「建立」,建立 Dataproc 叢集。Dataproc on GKE 叢集會顯示在「叢集」頁面的清單中。在叢集準備好可以使用之前,狀態會顯示為「佈建中」,之後狀態就會變更為「執行中」

gcloud

設定環境變數,然後在本機或 Cloud Shell 中執行 gcloud dataproc clusters gke create 指令,建立 Dataproc on GKE 叢集。

  1. 設定環境變數:

    DP_CLUSTER=Dataproc on GKE  cluster-name \
      REGION=region \
      GKE_CLUSTER=GKE cluster-name \
      BUCKET=Cloud Storage bucket-name \
      DP_POOLNAME=node pool-name
      PHS_CLUSTER=Dataproc PHS server name
    
    注意事項:

    • DP_CLUSTER:設定 Dataproc 虛擬叢集名稱,開頭須為小寫英文字母,其後最多可使用 54 個小寫英文字母、數字或連字號。結尾不得為連字號。
    • REGIONregion必須與 GKE 叢集所在的區域相同。
    • GKE_CLUSTER:現有 GKE 叢集的名稱。
    • BUCKET:(選用) 您可以指定 Cloud Storage bucket 的名稱,Dataproc 會使用這個 bucket 暫存構件。如果未指定 bucket,GKE 上的 Dataproc 會建立暫存 bucket。
    • DP_POOLNAME:要在 GKE 叢集上建立的節點集區名稱。
    • PHS_CLUSTER:(選用) Dataproc PHS 伺服器,用於查看現有和已刪除的 Dataproc on GKE 叢集中的 Spark 工作記錄。PHS 叢集必須與 Dataproc on GKE 虛擬叢集位於相同區域。
  2. 執行下列指令:

    gcloud dataproc clusters gke create ${DP_CLUSTER} \
        --region=${REGION} \
        --gke-cluster=${GKE_CLUSTER} \
        --spark-engine-version=latest \
        --staging-bucket=${BUCKET} \
        --pools="name=${DP_POOLNAME},roles=default" \
        --setup-workload-identity \
        --history-server-cluster=${PHS_CLUSTER}
    
    注意事項:

    • --spark-engine-version:Dataproc 叢集使用的 Spark 映像檔版本。您可以使用 ID,例如 33.1latest,也可以指定完整次要版本,例如 3.1-dataproc-5
    • --staging-bucket:刪除這個標記,讓 Dataproc on GKE 建立暫存值區。
    • --pools:這個標記用於指定 Dataproc 將建立或用來執行工作負載的新節點集區或現有節點集區。以半形逗號分隔的 Dataproc on GKE 節點集區設定清單,例如:
      --pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
      
      您必須指定節點集區 namerole。其他節點集區設定則可視需要指定。您可以使用多個 --pools 標記指定多個節點集區。至少須有一個節點集區具備 default 角色。所有節點集區必須位於相同位置。
    • --setup-workload-identity:這個旗標會啟用 Workload Identity 繫結。這些繫結可讓 Kubernetes 服務帳戶 (KSA) 做為虛擬叢集的預設 Dataproc VM 服務帳戶 (資料層身分)

REST

在 Dataproc API cluster.create 請求中,完成 virtualClusterConfig

使用任何要求資料之前,請先替換以下項目:

  • PROJECT:Google Cloud 專案 ID
  • REGION:Dataproc 虛擬叢集區域 (與現有 GKE 叢集區域相同)
  • DP_CLUSTER:Dataproc 叢集名稱
  • GKE_CLUSTER:GKE 叢集名稱
  • NODE_POOL:節點集區名稱
  • PHS_CLUSTER永久記錄伺服器 (PHS) 叢集名稱
  • BUCKET:(選用) 暫存值區名稱。如要讓 Dataproc on GKE 建立暫存值區,請將這個欄位留空。

HTTP 方法和網址:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters

JSON 要求主體:

{
  "clusterName":"DP_CLUSTER",
  "projectId":"PROJECT",
  "virtualClusterConfig":{
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    },
    "kubernetesClusterConfig":{
      "gkeClusterConfig":{
        "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"latest"
        }
      }
    },
    "stagingBucket":"BUCKET"
  }
}

如要傳送要求,請展開以下其中一個選項:

您應該會收到如下的 JSON 回應:

{
  "projectId":"PROJECT",
  "clusterName":"DP_CLUSTER",
  "status":{
    "state":"RUNNING",
    "stateStartTime":"2022-04-01T19:16:39.865716Z"
  },
  "clusterUuid":"98060b77-...",
  "statusHistory":[
    {
      "state":"CREATING",
      "stateStartTime":"2022-04-01T19:14:27.340544Z"
    }
  ],
  "labels":{
    "goog-dataproc-cluster-name":"DP_CLUSTER",
    "goog-dataproc-cluster-uuid":"98060b77-...",
    "goog-dataproc-location":"REGION",
    "goog-dataproc-environment":"prod"
  },
  "virtualClusterConfig":{
    "stagingBucket":"BUCKET",
    "kubernetesClusterConfig":{
      "kubernetesNamespace":"dp-cluster",
      "gkeClusterConfig":{
"gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"3.1-..."
        },
        "properties":{
          "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...",
          "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history",
          "spark:spark.eventLog.enabled":"true"
        }
      }
    },
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    }
  }

提交 Spark 工作

Dataproc on GKE 虛擬叢集執行後,請使用 Google Cloud 控制台、gcloud CLI 或 Dataproc jobs.submit API (透過直接 HTTP 要求或 Cloud Client Libraries),提交 Spark 工作

gcloud CLI Spark 工作範例:

gcloud dataproc jobs submit spark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    --class=org.apache.spark.examples.SparkPi \
    --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \
    -- 1000

gcloud CLI PySpark 作業範例:

gcloud dataproc jobs submit pyspark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/python/pi.py \
    -- 10

gcloud CLI SparkR 作業範例:

gcloud dataproc jobs submit spark-r \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/r/dataframe.R

清除所用資源