Utilizzo del connettore Cloud Storage con Apache Spark


Questo tutorial mostra come eseguire il codice di esempio che utilizza il connettore Cloud Storage con Apache Spark.

Obiettivi

Scrivi un semplice job Spark di conteggio parole in Java, Scala o Python, quindi esegui il job su un cluster Dataproc.

Costi

In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi Google Cloud utenti potrebbero avere diritto a una prova gratuita.

Prima di iniziare

Segui i passaggi riportati di seguito per prepararti a eseguire il codice in questo tutorial.

  1. Configura il progetto. Se necessario, configura un progetto con le API Dataproc, Compute Engine e Cloud Storage abilitate e Google Cloud CLI installata sulla tua macchina locale.

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Make sure that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Enable the APIs

    5. Create a service account:

      1. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      2. Select your project.
      3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      4. Click Create and continue.
      5. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      6. Click Continue.
      7. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    8. Install the Google Cloud CLI.

    9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

    10. To initialize the gcloud CLI, run the following command:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    12. Make sure that billing is enabled for your Google Cloud project.

    13. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Enable the APIs

    14. Create a service account:

      1. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      2. Select your project.
      3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      4. Click Create and continue.
      5. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      6. Click Continue.
      7. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    15. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    16. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    17. Install the Google Cloud CLI.

    18. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

    19. To initialize the gcloud CLI, run the following command:

      gcloud init
    20. Crea un bucket Cloud Storage. Ti serve un bucket Cloud Storage per archiviare i dati del tutorial. Se non ne hai uno pronto all'uso, crea un nuovo bucket nel tuo progetto.

      1. In the Google Cloud console, go to the Cloud Storage Buckets page.

        Go to Buckets

      2. Click Create.
      3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
        1. In the Get started section, do the following:
          • Enter a globally unique name that meets the bucket naming requirements.
          • To add a bucket label, expand the Labels section (), click Add label, and specify a key and a value for your label.
        2. In the Choose where to store your data section, do the following:
          1. Select a Location type.
          2. Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
          3. To set up cross-bucket replication, select Add cross-bucket replication via Storage Transfer Service and follow these steps:

            Set up cross-bucket replication

            1. In the Bucket menu, select a bucket.
            2. In the Replication settings section, click Configure to configure settings for the replication job.

              The Configure cross-bucket replication pane appears.

              • To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
              • To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
              • Click Done.
        3. In the Choose how to store your data section, do the following:
          1. Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
          2. To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
        4. In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
        5. In the Choose how to protect object data section, do the following:
          • Select any of the options under Data protection that you want to set for your bucket.
            • To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
            • To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
            • To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
              • To enable Object Retention Lock, click the Enable object retention checkbox.
              • To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
          • To choose how your object data will be encrypted, expand the Data encryption section (), and select a Data encryption method.
      4. Click Create.

    21. Imposta le variabili di ambiente locali. Imposta le variabili di ambiente sulla tua macchina locale. Imposta il tuo Google Cloud project-id e il nome del bucket Cloud Storage che utilizzerai per questo tutorial. Fornisci anche il nome e la regione di un cluster Dataproc esistente o nuovo. Puoi creare un cluster da utilizzare in questo tutorial nel passaggio successivo.

      PROJECT=project-id
      
      BUCKET_NAME=bucket-name
      
      CLUSTER=cluster-name
      
      REGION=cluster-region Example: "us-central1"
      

    22. Crea un cluster Dataproc. Esegui il comando riportato di seguito per creare un cluster Dataproc a un solo nodo nella zona di Compute Engine specificata.

      gcloud dataproc clusters create ${CLUSTER} \
          --project=${PROJECT} \
          --region=${REGION} \
          --single-node
      

    23. Copia i dati pubblici nel tuo bucket Cloud Storage. Copia un frammento di testo di Shakespeare con dati pubblici nella cartella input del tuo bucket Cloud Storage:

      gcloud storage cp gs://pub/shakespeare/rose.txt \
          gs://${BUCKET_NAME}/input/rose.txt
      

    24. Configura un ambiente di sviluppo Java (Apache Maven), Scala (SBT) o Python.

Prepara il job Spark di conteggio parole

Seleziona una scheda di seguito per seguire i passaggi per preparare un pacchetto o un file di job da inviare al cluster. Puoi preparare uno dei seguenti tipi di lavoro:

Java

  1. Copia il file pom.xml nella macchina locale. Il seguente file pom.xml specifica le dipendenze delle librerie Scala e Spark, a cui viene assegnato un ambito provided per indicare che il cluster Dataproc fornirà queste librerie in fase di runtime. Il file pom.xml non specifica una dipendenza Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster Cloud Storage (file con URI che iniziano con gs://), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Copia il codice WordCount.java elencato di seguito nel tuo computer locale.
    1. Crea un insieme di directory con il percorso src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copia WordCount.java nella tua macchina locale in src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java è un job Spark in Java che legge file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Crea il pacchetto.
    mvn clean package
    
    Se la build ha esito positivo, viene creato un target/word-count-1.0.jar.
  4. Prepara il pacchetto per Cloud Storage.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copia il file build.sbt nella macchina locale. Il seguente file build.sbt specifica le dipendenze delle librerie Scala e Spark, a cui viene assegnato un ambito provided per indicare che il cluster Dataproc fornirà queste librerie in fase di runtime. Il file build.sbt non specifica una dipendenza Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster Cloud Storage (file con URI che iniziano con gs://), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. Copia word-count.scala sulla tua macchina locale. Si tratta di un job Spark in Java che legge i file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Crea il pacchetto.
    sbt clean package
    
    Se la build ha esito positivo, viene creato un target/scala-2.11/word-count_2.11-1.0.jar.
  4. Prepara il pacchetto per Cloud Storage.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copia word-count.py sulla tua macchina locale. Si tratta di un job Spark in Python che utilizza PySpark per leggere i file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Invia il job

Esegui il seguente comando gcloud per inviare il job di conteggio parole al cluster Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Visualizzare l'output

Al termine del job, esegui questo comando gcloud CLI per visualizzare l'output di conteggio delle parole.

gcloud storage cat gs://${BUCKET_NAME}/output/*

L'output del conteggio delle parole dovrebbe essere simile al seguente:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Esegui la pulizia

Al termine del tutorial, puoi liberare spazio eliminando le risorse che hai creato in modo che non utilizzino più la quota e non generino addebiti. Le seguenti sezioni descrivono come eliminare o disattivare queste risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è quello di eliminare il progetto creato per il tutorial.

Per eliminare il progetto:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimina il cluster Dataproc

Anziché eliminare il progetto, potresti voler eliminare solo il cluster al suo interno.

Elimina il bucket Cloud Storage

Google Cloud console

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.

Riga di comando

    Elimina il bucket:
    gcloud storage buckets delete BUCKET_NAME

Passaggi successivi