Kirim tugas

Anda dapat mengirimkan tugas ke cluster Dataproc yang ada melalui permintaan HTTP atau terprogram jobs.submit Dataproc API, menggunakan alat command line gcloud Google Cloud CLI di jendela terminal lokal atau di Cloud Shell, atau dari konsolGoogle Cloud yang dibuka di browser lokal. Anda juga dapat melakukan SSH ke instance master di cluster, lalu menjalankan tugas langsung dari instance tanpa menggunakan layanan Dataproc.

Cara mengirimkan tugas

Konsol

Buka halaman Dataproc Submit a job di konsol Google Cloud di browser Anda.

Contoh tugas Spark

Untuk mengirimkan sampel tugas Spark, isi kolom di halaman Submit a job, sebagai berikut:

  1. Pilih nama Cluster Anda dari daftar cluster.
  2. Tetapkan Jenis pekerjaan ke Spark.
  3. Tetapkan Main class or jar ke org.apache.spark.examples.SparkPi.
  4. Tetapkan Arguments ke argumen tunggal 1000.
  5. Tambahkan file:///usr/lib/spark/examples/jars/spark-examples.jar ke file Jar:
    1. file:/// menunjukkan skema Hadoop LocalFileSystem. Dataproc menginstal /usr/lib/spark/examples/jars/spark-examples.jar di node master cluster saat membuat cluster.
    2. Atau, Anda dapat menentukan jalur Cloud Storage (gs://your-bucket/your-jarfile.jar) atau jalur Hadoop Distributed File System (hdfs://path-to-jar.jar) ke salah satu JAR Anda.

Klik Submit untuk memulai tugas. Setelah tugas dimulai, tugas tersebut akan ditambahkan ke daftar Tugas.

Klik ID Tugas untuk membuka halaman Tugas, tempat Anda dapat melihat output driver tugas. Karena tugas ini menghasilkan baris output panjang yang melebihi lebar jendela browser, Anda dapat mencentang kotak Pengepasan baris untuk menampilkan semua teks output dalam tampilan guna menampilkan hasil yang dihitung untuk pi.

Anda dapat melihat output driver tugas dari command line menggunakan perintah gcloud dataproc jobs wait yang ditampilkan di bawah (untuk mengetahui informasi selengkapnya, lihat Melihat output tugas–PERINTAH GCLOUD). Salin dan tempel project ID Anda sebagai nilai untuk tanda --project dan ID Pekerjaan Anda (ditampilkan di daftar Pekerjaan) sebagai argumen akhir.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

Berikut adalah cuplikan dari output driver untuk tugas SparkPi contoh yang dikirimkan di atas:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

gcloud

Untuk mengirimkan tugas ke cluster Dataproc, jalankan perintah gcloud CLI gcloud dataproc jobs submit secara lokal di jendela terminal atau di Cloud Shell.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Contoh pengiriman tugas PySpark
  1. Mencantumkan hello-world.py yang dapat diakses secara publik dan terletak di Cloud Storage.
    gcloud storage cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    Daftar File:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
  2. Kirim tugas Pyspark ke Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    Output terminal:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Contoh pengiriman tugas Spark
  1. Jalankan contoh SparkPi yang telah diinstal sebelumnya di node master cluster Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    Output terminal:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST

Bagian ini menunjukkan cara mengirimkan tugas Spark untuk menghitung perkiraan nilai pi menggunakan Dataproc API jobs.submit.

Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

  • project-id: Google Cloud project ID
  • region: cluster region
  • clusterName: nama cluster

Metode HTTP dan URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Meminta isi JSON:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

Anda akan melihat respons JSON seperti berikut:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Java

  1. Instal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Instal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode
    import re
    
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_bytes()
            .decode("utf-8")
        )
    
        print(f"Job finished successfully: {output}")
    
    

Go

  1. Instal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %w", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %w", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %w", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %w", err)
    	}
    
    	defer reader.Close()
    
    	body, err := io.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %w", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. Instal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

Mengirimkan tugas langsung di cluster Anda

Jika Anda ingin menjalankan tugas secara langsung di cluster tanpa menggunakan layanan Dataproc, lakukan SSH ke node master cluster Anda, lalu jalankan tugas di node master.

Setelah membuat koneksi SSH ke instance master VM, jalankan perintah di jendela terminal pada node master cluster untuk:

  1. Buka shell Spark.
  2. Jalankan tugas Spark sederhana untuk menghitung jumlah baris dalam file "hello-world" Python (tujuh baris) yang berada di file Cloud Storage yang dapat diakses secara publik.
  3. Keluar dari shell.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Menjalankan tugas bash di Dataproc

Anda mungkin ingin menjalankan skrip bash sebagai tugas Dataproc, baik karena mesin yang Anda gunakan tidak didukung sebagai jenis tugas Dataproc tingkat teratas atau karena Anda perlu melakukan penyiapan atau penghitungan argumen tambahan sebelum meluncurkan tugas menggunakan hadoop atau spark-submit dari skrip Anda.

Contoh Pig

Asumsikan Anda menyalin skrip bash hello.sh ke Cloud Storage:

gcloud storage cp hello.sh gs://${BUCKET}/hello.sh

Karena perintah pig fs menggunakan jalur Hadoop, salin skrip dari Cloud Storage ke tujuan yang ditentukan sebagai file:/// untuk memastikan skrip berada di sistem file lokal, bukan HDFS. Perintah sh berikutnya mereferensikan sistem file lokal secara otomatis dan tidak memerlukan awalan file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

Atau, karena tugas Dataproc yang mengirimkan argumen --jars melakukan staging file ke dalam direktori sementara yang dibuat selama masa aktif tugas, Anda dapat menentukan skrip shell Cloud Storage sebagai argumen --jars:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Perhatikan bahwa argumen --jars juga dapat mereferensikan skrip lokal:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'