提交工作

您可以透過 Dataproc API jobs.submit HTTP 或程式輔助要求,在本機終端機視窗或 Cloud Shell 中使用 Google Cloud CLI gcloud 指令列工具,或從本機瀏覽器中開啟的 Google Cloud 主控台,將工作提交至現有的 Dataproc 叢集。您也可以使用 SSH 連結叢集中的主要執行個體,然後直接從執行個體執行工作,無需使用 Dataproc 服務。

如何提交工作

控制台

在瀏覽器中,前往 Google Cloud 控制台的 Dataproc「提交工作」頁面。

Spark 工作範例

如要提交範例 Spark 工作,請依照下列說明填寫「Submit a job」(提交工作) 頁面中的欄位:

  1. 從叢集清單選取您的「Cluster」(叢集) 名稱
  2. 將「Job type」(工作類型) 設為 Spark
  3. 將「Main class or jar」(主要類別或 jar) 設為 org.apache.spark.examples.SparkPi
  4. 將「Arguments」(引數) 設為單一引數 1000
  5. file:///usr/lib/spark/examples/jars/spark-examples.jar 新增至 Jar 檔案
    1. file:/// 表示 Hadoop LocalFileSystem 配置。Dataproc 會在建立叢集時,將 /usr/lib/spark/examples/jars/spark-examples.jar 安裝在叢集的主要節點上。
    2. 或者,您也可以為其中一個 JAR 指定 Cloud Storage 路徑 (gs://your-bucket/your-jarfile.jar) 或 Hadoop 分散式檔案系統路徑 (hdfs://path-to-jar.jar)。

按一下 [Submit] (提交) 以啟動工作。工作啟動後,即會加入「Jobs」[工作] 清單中。

按一下工作 ID 以開啟「Jobs」(工作) 頁面。您可以在此頁面查看工作的驅動程式輸出。因為這個工作產生的輸出行長度超過瀏覽器視窗的寬度,您可以勾選「Line wrapping」(換行) 方塊,以便在視圖中顯示所有輸出文字,顯示計算後的 pi 結果。

您可以使用下方顯示的 gcloud dataproc jobs wait 指令,從指令列查看工作的驅動程式輸出 (詳情請參閱「查看工作輸出內容 - GCLOUD 指令」)。複製專案 ID 並貼上做為 --project 標記的值;複製工作 ID (顯示在工作清單上) 並貼上做為最後一個引數的值。

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

下列程式碼片段取自上方提交的範例 SparkPi 工作驅動程式輸出:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

gcloud

如要向 Dataproc 叢集提交工作,請在本機終端機視窗或 Cloud Shell 中執行 gcloud CLI gcloud dataproc jobs submit 指令。

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
PySpark 工作提交範例
  1. 列出位於 Cloud Storage 中可公開存取的 hello-world.py
    gcloud storage cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    列出檔案:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
  2. 將 Pyspark 工作提交至 Dataproc。
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    終端機輸出:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Spark 工作提交範例
  1. 執行預先安裝在 Dataproc 叢集主要節點上的 SparkPi 範例。
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    終端機輸出:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST

本節說明如何提交 Spark 工作,以便使用 Dataproc jobs.submit API 計算 pi 的近似值。

使用任何要求資料之前,請先替換以下項目:

  • project-id: Google Cloud 專案 ID
  • region叢集區域
  • clusterName:叢集名稱

HTTP 方法和網址:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

JSON 要求主體:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

如要傳送要求,請展開以下其中一個選項:

您應該會收到如下的 JSON 回應:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}
Google Cloud Google Cloud

Java

  1. 安裝用戶端程式庫
  2. 設定應用程式預設憑證
  3. 執行程式碼
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. 安裝用戶端程式庫
  2. 設定應用程式預設憑證
  3. 執行程式碼
    import re
    
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_bytes()
            .decode("utf-8")
        )
    
        print(f"Job finished successfully: {output}")
    
    

Go

  1. 安裝用戶端程式庫
  2. 設定應用程式預設憑證
  3. 執行程式碼
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %w", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %w", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %w", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %w", err)
    	}
    
    	defer reader.Close()
    
    	body, err := io.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %w", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. 安裝用戶端程式庫
  2. 設定應用程式預設憑證
  3. 執行程式碼
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

直接在叢集中提交工作

如果您想在叢集中直接執行工作而不使用 Dataproc 服務,請使用 SSH 連結叢集的主要節點,然後在主要節點上執行工作。

建立連結 VM 主要執行個體的 SSH 連線後,請在叢集主要節點的終端機視窗中執行指令以完成下列步驟:

  1. 開啟 Spark 殼層。
  2. 執行簡單的 Spark 工作,計算位於可公開存取的 Cloud Storage 檔案中,Python「hello-world」檔案的行數 (共七行)。
  3. 結束殼層。

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

在 Dataproc 上執行 bash 工作

您可能會想要以 Dataproc 工作形式執行 bash 指令碼,因為您使用的引擎不支援做為頂層 Dataproc 工作類型,或者您需要在使用指令碼中的 hadoopspark-submit 啟動工作前,先進行額外的設定或計算引數。

Pig 範例

假設您已將 hello.sh bash 指令碼複製到 Cloud Storage:

gcloud storage cp hello.sh gs://${BUCKET}/hello.sh

由於 pig fs 指令會使用 Hadoop 路徑,請將指令碼從 Cloud Storage 複製到指定為 file:/// 的目標,確保指令碼位於本機檔案系統而非 HDFS 中。後續的 sh 指令會自動參照本機檔案系統,且不需要 file:/// 前置字串。

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

或者,由於 Dataproc 工作提交 --jars 引數會將檔案暫存至為工作生命週期建立的暫存目錄,因此您可以將 Cloud Storage 指令碼指定為 --jars 引數:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

請注意,--jars 引數也可以參照本機指令碼:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'