本教程介绍了创建 Spark Scala 作业并将该作业提交到 Dataproc 集群的不同方法,其中包括如何执行以下操作:
- 使用 Scala REPL(Read-Evaluate-Print-Loop 或交互式解析器)或 SBT 构建工具,通过命令行在本地机器上编写和编译 Spark Scala“Hello World”应用
- 将经过编译的 Scala 类打包成一个包含清单的 jar 文件
- 将 Scala jar 提交到在 Dataproc 集群上运行的 Spark 作业
- 通过 Google Cloud 控制台检查 Scala 作业的输出
本教程还向您展示了如何执行以下操作:
使用
spark-shell
REPL 直接在 Dataproc 集群上编写和运行 Spark Scala“WordCount”mapreduce 作业在集群上运行预先安装的 Apache Spark 和 Hadoop 示例
设置 Google Cloud Platform 项目
如果您尚未对项目进行设置,请按以下步骤进行操作:
在本地编写和编译 Scala 代码
作为本教程的一个简单练习,请在开发机器上本地使用 Scala REPL 或 SBT 命令行界面编写“Hello World”Scala 应用。
使用 Scala
- 从 Scala 安装页面下载 Scala 二进制文件
如 Scala 安装说明中所示那样,解压缩该文件,设置
SCALA_HOME
环境变量,并将其添加到您的路径中。例如:export SCALA_HOME=/usr/local/share/scala export PATH=$PATH:$SCALA_HOME/
启动 Scala REPL
$ scala Welcome to Scala version ... Type in expressions to have them evaluated. Type :help for more information. scala>
将
HelloWorld
代码复制并粘贴到 Scala REPL 中object HelloWorld { def main(args: Array[String]): Unit = { println("Hello, world!") } }
保存
HelloWorld.scala
并退出 REPLscala> :save HelloWorld.scala scala> :q
使用
scalac
进行编译$ scalac HelloWorld.scala
列出已编译的
.class
文件$ ls HelloWorld*.class HelloWorld$.class HelloWorld.class
使用 SBT
创建一个“HelloWorld”项目,如下所示
$ mkdir hello $ cd hello $ echo \ 'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \ HelloWorld.scala
创建一个
sbt.build
配置文件,以将artifactName
(您将在下面生成的 jar 文件的名称)设置为“HelloWorld.jar”(请参阅修改默认工件)echo \ 'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => "HelloWorld.jar" }' > \ build.sbt
启动 SBT 并运行代码
$ sbt [info] Set current project to hello ... > run ... Compiling 1 Scala source to .../hello/target/scala-.../classes... ... Running HelloWorld Hello, world! [success] Total time: 3 s ...
将代码打包成带有指定了主类入口点 (
HelloWorld
) 的清单的 jar 文件,然后退出> package ... Packaging .../hello/target/scala-.../HelloWorld.jar ... ... Done packaging. [success] Total time: ... > exit
创建 jar
使用 SBT 创建 jar
SBT 软件包命令可创建 jar 文件(请参阅使用 SBT)。
手动创建 jar
- 将目录 (
cd
) 更改为包含已编译的HelloWorld*.class
文件的目录,然后运行以下命令将类文件打包到在清单指定了主类入口点 (HelloWorld
) 的 jar 中。$ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class added manifest adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%) adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
将 jar 复制到 Cloud Storage
- 使用 Google Cloud CLI 将 jar 复制到项目中的 Cloud Storage 存储桶
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/ Copying file://HelloWorld.jar [Content-Type=application/java-archive]... Uploading gs://bucket-name/HelloWorld.jar: 1.46 KiB/1.46 KiB
将 jar 提交到 Dataproc Spark 作业
使用 Google Cloud 控制台将 jar 文件提交到您的 Dataproc Spark 作业。按如下所示,填写提交作业页面上的字段:
- 集群:从集群列表中选择集群名称
- 作业类型:Spark
主类或 jar:指定 HelloWorld jar (
gs://your-bucket-name/HelloWorld.jar
) 的 Cloud Storage URI 路径。如果 jar 不包含指定了代码入口点(“Main-Class: HelloWorld”)的清单,则“主类或 jar”字段应声明主类的名称(“HelloWorld”),而您应使用 jar 文件的 URI 路径 (
gs://your-bucket-name/HelloWorld.jar
) 填写“Jar 文件”字段。
点击提交以启动作业。作业启动后,会被添加到作业列表中。
点击“作业 ID”以打开作业页面,您可以在该页面中查看作业的驱动程序输出。
使用集群的 spark-shell
REPL 编写并运行 Spark Scala 代码
建议您直接在自己的 Dataproc 集群上开发 Scala 应用。Hadoop 和 Spark 预先安装在 Dataproc 集群上,并且它们配置了 Cloud Storage 连接器,该连接器允许您的代码直接从 Cloud Storage 读取数据并将数据写入 Cloud Storage。
此示例向您展示了如何通过 SSH 连接到项目的 Dataproc 集群主服务器节点,然后使用 spark-shell REPL 创建并运行 Scala wordcount mapreduce 应用。
通过 SSH 连接到 Dataproc 集群的主节点
在 Google Cloud 控制台中,前往项目的 Dataproc 集群页面,然后点击集群的名称。
在集群详情页面上,选择虚拟机实例标签页,然后点击集群名称行右侧显示的 SSH 选项。
此时会在主节点上的主目录打开一个浏览器窗口
启动
spark-shell
$ spark-shell ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
从位于公共 Cloud Storage 中的莎士比亚作品文本片段创建 RDD(弹性分布式数据集)
scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
针对该文本运行字数统计 mapreduce,然后显示
wordcounts
结果scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) scala> wordCounts.collect ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1), (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
将计数保存到 Cloud Storage 中的
<bucket-name>/wordcounts-out
,然后退出scala-shell
scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/") scala> exit
使用 gcloud CLI 列出输出文件并显示文件内容
$ gcloud storage ls gs://bucket-name/wordcounts-out/ gs://spark-scala-demo-bucket/wordcounts-out/ gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS gs://spark-scala-demo-bucket/wordcounts-out/part-00000 gs://spark-scala-demo-bucket/wordcounts-out/part-00001
检查
gs://<bucket-name>/wordcounts-out/part-00000
内容$ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000 (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1)
运行预先安装的示例代码
Dataproc 主节点包含具有标准 Apache Hadoop 和 Spark 示例的可运行 jar 文件。
Jar 类型 | Master node /usr/lib/ location |
GitHub 来源 | Apache 文档 |
---|---|---|---|
Hadoop | hadoop-mapreduce/hadoop-mapreduce-examples.jar |
来源链接 | MapReduce 教程 |
Spark | spark/lib/spark-examples.jar |
来源链接 | Spark 示例 |
通过命令行向集群提交示例
您可以使用 Google Cloud CLI gcloud
命令行工具从本地开发机器提交示例(请参阅使用 Google Cloud 控制台从 Google Cloud 控制台提交作业)。
Hadoop WordCount 示例
gcloud dataproc jobs submit hadoop --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ --class=org.apache.hadoop.examples.WordCount \ -- URI of input file URI of output file
Spark WordCount 示例
gcloud dataproc jobs submit spark --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- URI of input file
关闭集群
为避免不断产生费用,请关闭本教程使用的集群并删除 Cloud Storage 资源(Cloud Storage 存储分区和文件)。
要关闭集群,请运行以下命令:
gcloud dataproc clusters delete cluster-name \ --region=region
要删除 Cloud Storage jar 文件,请运行以下命令:
gcloud storage rm gs://bucket-name/HelloWorld.jar
您可以使用以下命令删除存储分区及其所有文件夹和文件:
gcloud storage rm gs://bucket-name/ --recursive