Menggunakan konektor Cloud Storage dengan Apache Spark


Tutorial ini menunjukkan cara menjalankan contoh kode yang menggunakan konektor Cloud Storage dengan Apache Spark.

Tujuan

Tulis tugas Spark wordcount sederhana di Java, Scala, atau Python, lalu jalankan tugas di cluster Dataproc.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Jalankan langkah-langkah di bawah untuk bersiap menjalankan kode dalam tutorial ini.

  1. Siapkan project Anda. Jika perlu, siapkan project dengan Dataproc, Compute Engine, dan Cloud Storage API yang diaktifkan serta Google Cloud CLI yang diinstal di komputer lokal Anda.

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Enable the APIs

    5. Create a service account:

      1. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      2. Select your project.
      3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      4. Click Create and continue.
      5. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      6. Click Continue.
      7. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    8. Install the Google Cloud CLI.

    9. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

    10. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    12. Verify that billing is enabled for your Google Cloud project.

    13. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Enable the APIs

    14. Create a service account:

      1. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      2. Select your project.
      3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      4. Click Create and continue.
      5. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      6. Click Continue.
      7. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    15. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    16. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    17. Install the Google Cloud CLI.

    18. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

    19. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

      gcloud init
    20. Membuat bucket Cloud Storage. Anda memerlukan Cloud Storage untuk menyimpan data tutorial. Jika Anda belum memiliki bucket yang siap digunakan, buat bucket baru di project Anda.

      1. In the Google Cloud console, go to the Cloud Storage Buckets page.

        Go to Buckets

      2. Click Create.
      3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
        1. In the Get started section, do the following:
          • Enter a globally unique name that meets the bucket naming requirements.
          • To add a bucket label, expand the Labels section (), click Add label, and specify a key and a value for your label.
        2. In the Choose where to store your data section, do the following:
          1. Select a Location type.
          2. Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
          3. To set up cross-bucket replication, select Add cross-bucket replication via Storage Transfer Service and follow these steps:

            Set up cross-bucket replication

            1. In the Bucket menu, select a bucket.
            2. In the Replication settings section, click Configure to configure settings for the replication job.

              The Configure cross-bucket replication pane appears.

              • To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
              • To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
              • Click Done.
        3. In the Choose how to store your data section, do the following:
          1. Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
          2. To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
        4. In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
        5. In the Choose how to protect object data section, do the following:
          • Select any of the options under Data protection that you want to set for your bucket.
            • To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
            • To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
            • To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
              • To enable Object Retention Lock, click the Enable object retention checkbox.
              • To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
          • To choose how your object data will be encrypted, expand the Data encryption section (), and select a Data encryption method.
      4. Click Create.

    21. Tetapkan variabel lingkungan lokal. Tetapkan variabel lingkungan di mesin lokal Anda. Tetapkan Google Cloud project-id dan nama bucket Cloud Storage yang akan Anda gunakan untuk tutorial ini. Berikan juga nama dan region cluster Dataproc yang ada atau baru. Anda dapat membuat cluster untuk digunakan dalam tutorial ini di langkah berikutnya.

      PROJECT=project-id
      
      BUCKET_NAME=bucket-name
      
      CLUSTER=cluster-name
      
      REGION=cluster-region Example: "us-central1"
      

    22. Buat cluster Dataproc. Jalankan perintah di bawah untuk membuat cluster Dataproc single-node di zona Compute Engine yang ditentukan.

      gcloud dataproc clusters create ${CLUSTER} \
          --project=${PROJECT} \
          --region=${REGION} \
          --single-node
      

    23. Salin data publik ke bucket Cloud Storage Anda. Salin cuplikan teks Shakespeare data publik ke folder input bucket Cloud Storage Anda:

      gcloud storage cp gs://pub/shakespeare/rose.txt \
          gs://${BUCKET_NAME}/input/rose.txt
      

    24. Siapkan lingkungan pengembangan Java (Apache Maven), Scala (SBT), atau Python.

    25. Siapkan tugas jumlah kata Spark

      Pilih tab di bawah untuk mengikuti langkah-langkah menyiapkan paket atau file tugas untuk dikirimkan ke cluster Anda. Anda dapat menyiapkan salah satu jenis tugas berikut;

      Java

      1. Salin file pom.xml ke komputer lokal Anda. File pom.xml berikut menentukan dependensi library Scala dan Spark, yang diberi cakupan provided untuk menunjukkan bahwa cluster Dataproc akan menyediakan library ini saat runtime. File pom.xml tidak menentukan dependensi Cloud Storage karena konektor menerapkan antarmuka HDFS standar. Saat tugas Spark mengakses file cluster Cloud Storage (file dengan URI yang dimulai dengan gs://), sistem secara otomatis menggunakan konektor Cloud Storage untuk mengakses file di Cloud Storage
        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
        
          <groupId>dataproc.codelab</groupId>
          <artifactId>word-count</artifactId>
          <version>1.0</version>
        
          <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
          </properties>
        
          <dependencies>
            <dependency>
              <groupId>org.scala-lang</groupId>
              <artifactId>scala-library</artifactId>
              <version>Scala version, for example, 2.11.8</version>
              <scope>provided</scope>
            </dependency>
            <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
              <version>Spark version, for example, 2.3.1</version>
              <scope>provided</scope>
            </dependency>
          </dependencies>
        </project>
      2. Salin kode WordCount.java yang tercantum di bawah, ke komputer lokal Anda.
        1. Buat serangkaian direktori dengan jalur src/main/java/dataproc/codelab:
          mkdir -p src/main/java/dataproc/codelab
          
        2. Salin WordCount.java ke komputer lokal Anda ke dalam src/main/java/dataproc/codelab:
          cp WordCount.java src/main/java/dataproc/codelab
          

        WordCount.java adalah tugas Spark di Java yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.

        package dataproc.codelab;
        
        import java.util.Arrays;
        import org.apache.spark.SparkConf;
        import org.apache.spark.api.java.JavaPairRDD;
        import org.apache.spark.api.java.JavaRDD;
        import org.apache.spark.api.java.JavaSparkContext;
        import scala.Tuple2;
        
        public class WordCount {
          public static void main(String[] args) {
            if (args.length != 2) {
              throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
            }
            String inputPath = args[0];
            String outputPath = args[1];
            JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
            JavaRDD<String> lines = sparkContext.textFile(inputPath);
            JavaRDD<String> words = lines.flatMap(
                (String line) -> Arrays.asList(line.split(" ")).iterator()
            );
            JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
                (String word) -> new Tuple2<>(word, 1)
            ).reduceByKey(
                (Integer count1, Integer count2) -> count1 + count2
            );
            wordCounts.saveAsTextFile(outputPath);
          }
        }
      3. Bangun paket.
        mvn clean package
        
        Jika build berhasil, target/word-count-1.0.jar akan dibuat.
      4. Lakukan staging paket ke Cloud Storage.
        gcloud storage cp target/word-count-1.0.jar \
            gs://${BUCKET_NAME}/java/word-count-1.0.jar
        

      Scala

      1. Salin file build.sbt ke komputer lokal Anda. File build.sbt berikut menentukan dependensi library Scala dan Spark, yang diberi cakupan provided untuk menunjukkan bahwa cluster Dataproc akan menyediakan library ini saat runtime. File build.sbt tidak menentukan dependensi Cloud Storage karena konektor menerapkan antarmuka HDFS standar. Saat tugas Spark mengakses file cluster Cloud Storage (file dengan URI yang diawali dengan gs://), sistem akan otomatis menggunakan konektor Cloud Storage untuk mengakses file di Cloud Storage
        scalaVersion := "Scala version, for example, 2.11.8"
        
        name := "word-count"
        organization := "dataproc.codelab"
        version := "1.0"
        
        libraryDependencies ++= Seq(
          "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
          "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
        )
      2. Salin word-count.scala ke komputer lokal Anda. Ini adalah tugas Spark di Java yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.
        package dataproc.codelab
        
        import org.apache.spark.SparkContext
        import org.apache.spark.SparkConf
        
        object WordCount {
          def main(args: Array[String]) {
            if (args.length != 2) {
              throw new IllegalArgumentException(
                  "Exactly 2 arguments are required: <inputPath> <outputPath>")
            }
        
            val inputPath = args(0)
            val outputPath = args(1)
        
            val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
            val lines = sc.textFile(inputPath)
            val words = lines.flatMap(line => line.split(" "))
            val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
            wordCounts.saveAsTextFile(outputPath)
          }
        }
      3. Bangun paket.
        sbt clean package
        
        Jika build berhasil, target/scala-2.11/word-count_2.11-1.0.jar akan dibuat.
      4. Lakukan staging paket ke Cloud Storage.
        gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
            gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
        

      Python

      1. Salin word-count.py ke komputer lokal Anda. Ini adalah tugas Spark di Python menggunakan PySpark yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.
        #!/usr/bin/env python
        
        import pyspark
        import sys
        
        if len(sys.argv) != 3:
          raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
        
        inputUri=sys.argv[1]
        outputUri=sys.argv[2]
        
        sc = pyspark.SparkContext()
        lines = sc.textFile(sys.argv[1])
        words = lines.flatMap(lambda line: line.split())
        wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
        wordCounts.saveAsTextFile(sys.argv[2])

      Kirim tugas

      Jalankan perintah gcloud berikut untuk mengirimkan tugas wordcount ke cluster Dataproc Anda.

      Java

      gcloud dataproc jobs submit spark \
          --cluster=${CLUSTER} \
          --class=dataproc.codelab.WordCount \
          --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
          --region=${REGION} \
          -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
      

      Scala

      gcloud dataproc jobs submit spark \
          --cluster=${CLUSTER} \
          --class=dataproc.codelab.WordCount \
          --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
          --region=${REGION} \
          -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
      

      Python

      gcloud dataproc jobs submit pyspark word-count.py \
          --cluster=${CLUSTER} \
          --region=${REGION} \
          -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
      

      Melihat output

      Setelah tugas selesai, jalankan perintah gcloud CLI berikut untuk melihat output jumlah kata.

      gcloud storage cat gs://${BUCKET_NAME}/output/*
      

      Output jumlah kata akan mirip dengan berikut ini:

      (a,2)
      (call,1)
      (What's,1)
      (sweet.,1)
      (we,1)
      (as,1)
      (name?,1)
      (any,1)
      (other,1)
      (rose,1)
      (smell,1)
      (name,1)
      (would,1)
      (in,1)
      (which,1)
      (That,1)
      (By,1)
      

      Pembersihan

      Setelah menyelesaikan tutorial, Anda dapat membersihkan resource yang dibuat agar resource tersebut berhenti menggunakan kuota dan dikenai biaya. Bagian berikut menjelaskan cara menghapus atau menonaktifkan resource ini.

      Menghapus project

      Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project yang Anda buat untuk tutorial.

      Untuk menghapus project:

      1. In the Google Cloud console, go to the Manage resources page.

        Go to Manage resources

      2. In the project list, select the project that you want to delete, and then click Delete.
      3. In the dialog, type the project ID, and then click Shut down to delete the project.

      Hapus cluster Dataproc

      Daripada menghapus project, Anda mungkin hanya ingin menghapus cluster dalam project.

      Menghapus bucket Cloud Storage

    26. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets

    27. Click the checkbox for the bucket that you want to delete.
    28. To delete the bucket, click Delete, and then follow the instructions.
    29. Langkah berikutnya