在 Dataproc 上编写并运行 Spark Scala 作业

本教程介绍了创建 Spark Scala 作业并将该作业提交到 Dataproc 集群的不同方法,其中包括如何执行以下操作:

  • 使用 Scala REPL(Read-Evaluate-Print-Loop 或交互式解析器)或 SBT 构建工具,通过命令行在本地机器上编写和编译 Spark Scala“Hello World”应用
  • 将经过编译的 Scala 类打包成一个包含清单的 jar 文件
  • 将 Scala jar 提交到在 Dataproc 集群上运行的 Spark 作业
  • 通过 Google Cloud 控制台检查 Scala 作业的输出

本教程还向您展示了如何执行以下操作:

  • 使用 spark-shell REPL 直接在 Dataproc 集群上编写和运行 Spark Scala“WordCount”mapreduce 作业

  • 在集群上运行预先安装的 Apache Spark 和 Hadoop 示例

设置 Google Cloud Platform 项目

如果您尚未对项目进行设置,请按以下步骤进行操作:

  1. 设置项目
  2. 创建 Cloud Storage 存储分区
  3. 创建 Dataproc 集群

在本地编写和编译 Scala 代码

作为本教程的一个简单练习,请在开发机器上本地使用 Scala REPLSBT 命令行界面编写“Hello World”Scala 应用。

使用 Scala

  1. Scala 安装页面下载 Scala 二进制文件
  2. Scala 安装说明中所示那样,解压缩该文件,设置 SCALA_HOME 环境变量,并将其添加到您的路径中。例如:

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. 启动 Scala REPL

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. HelloWorld 代码复制并粘贴到 Scala REPL 中

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }

  5. 保存 HelloWorld.scala 并退出 REPL

    scala> :save HelloWorld.scala
    scala> :q
    

  6. 使用 scalac 进行编译

    $ scalac HelloWorld.scala
    

  7. 列出已编译的 .class 文件

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

使用 SBT

  1. 下载 SBT

  2. 创建一个“HelloWorld”项目,如下所示

    $ mkdir hello
    $ cd hello
    $ echo \
    'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
    HelloWorld.scala
    

  3. 创建一个 sbt.build 配置文件,以将 artifactName(您将在下面生成的 jar 文件的名称)设置为“HelloWorld.jar”(请参阅修改默认工件

    echo \
    'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    "HelloWorld.jar" }' > \
    build.sbt
    

  4. 启动 SBT 并运行代码

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. 将代码打包成带有指定了主类入口点 (HelloWorld) 的清单的 jar 文件,然后退出

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

创建 jar

使用 SBT 或使用 jar 命令创建 jar 文件

使用 SBT 创建 jar

SBT 软件包命令可创建 jar 文件(请参阅使用 SBT)。

手动创建 jar

  1. 将目录 (cd) 更改为包含已编译的 HelloWorld*.class 文件的目录,然后运行以下命令将类文件打包到在清单指定了主类入口点 (HelloWorld) 的 jar 中。
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

将 jar 复制到 Cloud Storage

  1. 使用 Google Cloud CLI 将 jar 复制到项目中的 Cloud Storage 存储桶
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

将 jar 提交到 Dataproc Spark 作业

  1. 使用 Google Cloud 控制台将 jar 文件提交到您的 Dataproc Spark 作业。按如下所示,填写提交作业页面上的字段:

    • 集群:从集群列表中选择集群名称
    • 作业类型:Spark
    • 主类或 jar:指定 HelloWorld jar (gs://your-bucket-name/HelloWorld.jar) 的 Cloud Storage URI 路径。

      如果 jar 不包含指定了代码入口点(“Main-Class: HelloWorld”)的清单,则“主类或 jar”字段应声明主类的名称(“HelloWorld”),而您应使用 jar 文件的 URI 路径 (gs://your-bucket-name/HelloWorld.jar) 填写“Jar 文件”字段。

  2. 点击提交以启动作业。作业启动后,会被添加到作业列表中。

  3. 点击“作业 ID”以打开作业页面,您可以在该页面中查看作业的驱动程序输出。

使用集群的 spark-shell REPL 编写并运行 Spark Scala 代码

建议您直接在自己的 Dataproc 集群上开发 Scala 应用。Hadoop 和 Spark 预先安装在 Dataproc 集群上,并且它们配置了 Cloud Storage 连接器,该连接器允许您的代码直接从 Cloud Storage 读取数据并将数据写入 Cloud Storage。

此示例向您展示了如何通过 SSH 连接到项目的 Dataproc 集群主服务器节点,然后使用 spark-shell REPL 创建并运行 Scala wordcount mapreduce 应用。

  1. 通过 SSH 连接到 Dataproc 集群的主节点

    1. 在 Google Cloud 控制台中,前往项目的 Dataproc 集群页面,然后点击集群的名称。

    2. 在集群详情页面上,选择虚拟机实例标签页,然后点击集群名称行右侧显示的 SSH 选项。

      此时会在主节点上的主目录打开一个浏览器窗口

  2. 启动 spark-shell

    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    

  3. 从位于公共 Cloud Storage 中的莎士比亚作品文本片段创建 RDD(弹性分布式数据集)

    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    

  4. 针对该文本运行字数统计 mapreduce,然后显示 wordcounts 结果

    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
    (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    

  5. 将计数保存到 Cloud Storage 中的 <bucket-name>/wordcounts-out,然后退出 scala-shell

    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    

  6. 使用 gcloud CLI 列出输出文件并显示文件内容

    $ gcloud storage ls gs://bucket-name/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    

  7. 检查 gs://<bucket-name>/wordcounts-out/part-00000 内容

    $ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

运行预先安装的示例代码

Dataproc 主节点包含具有标准 Apache Hadoop 和 Spark 示例的可运行 jar 文件。

Jar 类型 Master node /usr/lib/ location GitHub 来源 Apache 文档
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar 来源链接 MapReduce 教程
Spark spark/lib/spark-examples.jar 来源链接 Spark 示例

通过命令行向集群提交示例

您可以使用 Google Cloud CLI gcloud 命令行工具从本地开发机器提交示例(请参阅使用 Google Cloud 控制台从 Google Cloud 控制台提交作业)。

Hadoop WordCount 示例

gcloud dataproc jobs submit hadoop --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
    --class=org.apache.hadoop.examples.WordCount \
    -- URI of input file URI of output file

Spark WordCount 示例

gcloud dataproc jobs submit spark --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.JavaWordCount \
    -- URI of input file

关闭集群

为避免不断产生费用,请关闭本教程使用的集群并删除 Cloud Storage 资源(Cloud Storage 存储分区和文件)。

要关闭集群,请运行以下命令:

gcloud dataproc clusters delete cluster-name \
    --region=region

要删除 Cloud Storage jar 文件,请运行以下命令:

gcloud storage rm gs://bucket-name/HelloWorld.jar

您可以使用以下命令删除存储分区及其所有文件夹和文件:

gcloud storage rm gs://bucket-name/ --recursive

后续步骤