このチュートリアルでは、Spark Scala ジョブを作成し、Dataproc クラスタに送信するさまざまな方法について説明します。ここで説明する方法は次のとおりです。
- Scala REPL(Read-Evaluate-Print-Loop またはインタラクティブ インタープリタ)または SBT ビルドツールを使用して、コマンドラインから、ローカルマシンで Spark Scala の「Hello World」アプリを作成しコンパイルする
- コンパイルした Scala クラスをマニフェストとともに jar ファイルにコンパイルする
- Dataproc クラスタで実行される Spark ジョブに Scala jar を送信する
- Google Cloud Console からの Scala ジョブ出力を確認する
このチュートリアルでは、次の方法についても説明します。
Dataproc クラスタで、
spark-shell
REPL を使用して Spark Scala の「WordCount」MapReduce ジョブを直接作成し実行するプレインストールされた Apache Spark と Hadoop のサンプルをクラスタで実行する
Google Cloud Platform プロジェクトの設定
次の手順をまだ行っていない場合は、行います。
ローカルでの Scala コードの作成とコンパイル
このチュートリアルの簡単な演習として、Scala REPL または SBT コマンドライン インターフェースを使用し、ローカルで「Hello World」Scala アプリを作成します。
Scala を使用する
- Scala のインストール ページから Scala バイナリをダウンロードします。
Scala のインストールの手順に沿って、ファイルを解凍し、
SCALA_HOME
環境変数を設定して、パスに追加します。次に例を示します。export SCALA_HOME=/usr/local/share/scala export PATH=$PATH:$SCALA_HOME/
Scala REPL を起動します。
$ scala Welcome to Scala version ... Type in expressions to have them evaluated. Type :help for more information. scala>
HelloWorld
コードをコピーして Scala REPL に貼り付けます。object HelloWorld { def main(args: Array[String]): Unit = { println("Hello, world!") } }
HelloWorld.scala
を保存して REPL を終了します。scala> :save HelloWorld.scala scala> :q
scalac
をコンパイルします。$ scalac HelloWorld.scala
コンパイル済みの
.class
ファイルを一覧表示します。$ ls HelloWorld*.class HelloWorld$.class HelloWorld.class
SBT を使用する
次のように、「HelloWorld」プロジェクトを作成します。
$ mkdir hello $ cd hello $ echo \ 'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \ HelloWorld.scala
sbt.build
構成ファイルを作成して「HelloWorld.jar」にartifactName
(生成した jar ファイル名は下記に表示)を設定します(デフォルトのアーティファクトの変更をご覧ください)。echo \ 'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => "HelloWorld.jar" }' > \ build.sbt
SBT を起動し、コードを実行します。
$ sbt [info] Set current project to hello ... > run ... Compiling 1 Scala source to .../hello/target/scala-.../classes... ... Running HelloWorld Hello, world! [success] Total time: 3 s ...
コードをメインクラスのエントリ ポイントを指定するマニフェスト(
HelloWorld
)とともに jar ファイルにパッケージ化し、終了します。> package ... Packaging .../hello/target/scala-.../HelloWorld.jar ... ... Done packaging. [success] Total time: ... > exit
jar の作成
SBT
または jar コマンドを使用して jar ファイルを作成します。
SBT で jar を作成する
SBT の package コマンドで jar ファイルを作成します(SBT を使用するをご覧ください)。
手動で jar を作成する
- コンパイル済みの
HelloWorld*.class
ファイルを含むディレクトリにディレクトリを変更(cd
)し、次のコマンドを実行して、メインクラスのエントリ ポイント(HelloWorld
)を指定するマニフェストとともにクラスファイルを jar ファイルにパッケージ化します。$ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class added manifest adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%) adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
Cloud Storage への jar のコピー
- Google Cloud CLI を使用して、jar をプロジェクトの Cloud Storage バケットにコピーします。
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/ Copying file://HelloWorld.jar [Content-Type=application/java-archive]... Uploading gs://bucket-name/HelloWorld.jar: 1.46 KiB/1.46 KiB
Dataproc Spark ジョブに jar を送信する
Google Cloud コンソール を使用して、jar ファイルを Dataproc Spark ジョブに送信します。[ジョブの送信] ページの項目に次のように入力します。
- [クラスタ]: クラスタリストからクラスタの名前を選択します。
- [ジョブタイプ]: Spark
[メインクラスまたは JAR]: HelloWorld jar への Cloud Storage URI パスを指定します(
gs://your-bucket-name/HelloWorld.jar
)。コードへのエントリ ポイント(「Main-Class: HelloWorld」)を指定するマニフェストが jar に含まれていない場合は、[メインクラスまたは JAR] 項目にメインクラスの名前(「HelloWorld」)を指定し、[JAR ファイル] 項目に jar ファイルへの URI パス(
gs://your-bucket-name/HelloWorld.jar
)を入力する必要があります。
[送信] をクリックしてジョブを開始します。開始されたジョブが [ジョブ] リストに追加されます。
ジョブ ID をクリックして [ジョブ] ページを開くと、ジョブのドライバ出力が表示されます。
クラスタの spark-shell
REPL を使用した Spark Scala コードの作成と実行
Scala アプリは Dataproc クラスタで直接開発することをおすすめします。Hadoop と Spark は Dataproc クラスタにプレインストールされており、Cloud Storage コネクタを使用して構成されています。これにより、Cloud Storage との間で直接データの読み取りや書き込みができます。
この例では、プロジェクトの Dataproc クラスタ マスターノードに SSH 接続し、spark-shell REPL を使用して Scala wordcount mapreduce アプリケーションを作成し実行する方法を示します。
Dataproc クラスタのマスターノードに SSH で接続します。
Google Cloud コンソールでプロジェクトの Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
クラスタの詳細ページで [VM インスタンス] タブを選択し、クラスタの名前がある行の右側に表示される [SSH] 選択ボタンをクリックします。
マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。
spark-shell
を起動します。$ spark-shell ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
一般に公開されている Cloud Storage にあるシェイクスピア テキスト スニペットから RDD(耐障害性分散データセット)を作成する
scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
テキストで wordcount mapreduce を実行し、
wordcounts
の結果を表示します。scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) scala> wordCounts.collect ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1), (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
Cloud Storage の
<bucket-name>/wordcounts-out
のカウントを保存し、scala-shell
を終了します。scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/") scala> exit
gcloud CLI を使用して出力ファイルを一覧表示し、ファイルの内容を表示する
$ gcloud storage ls gs://bucket-name/wordcounts-out/ gs://spark-scala-demo-bucket/wordcounts-out/ gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS gs://spark-scala-demo-bucket/wordcounts-out/part-00000 gs://spark-scala-demo-bucket/wordcounts-out/part-00001
gs://<bucket-name>/wordcounts-out/part-00000
の内容を確認します。$ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000 (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1)
プレインストールされたサンプルコードの実行
Dataproc のマスターノードには、実行可能な jar ファイルが標準の Apache Hadoop と Spark のサンプルとともに含まれています。
Jar タイプ | Master node /usr/lib/ location |
GitHub のソース | Apache ドキュメント |
---|---|---|---|
Hadoop | hadoop-mapreduce/hadoop-mapreduce-examples.jar |
ソースリンク | MapReduce のチュートリアル |
Spark | spark/lib/spark-examples.jar |
ソースリンク | Spark のサンプル |
クラスタへのコマンドラインからのサンプルの送信
Google Cloud CLI gcloud
コマンドライン ツールを使用して、ローカル開発マシンからサンプルを送信できます。(Google Cloud コンソールからジョブを送信するには、Google Cloud コンソールの使用をご覧ください)。
Hadoop WordCount のサンプル
gcloud dataproc jobs submit hadoop --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ --class=org.apache.hadoop.examples.WordCount \ -- URI of input file URI of output file
Spark WordCount サンプル
gcloud dataproc jobs submit spark --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- URI of input file
クラスタのシャットダウン
課金され続けることがないようにするために、クラスタをシャットダウンし、このチュートリアルで使用した Cloud Storage リソース(Cloud Storage バケットとファイル)を削除します。
クラスタをシャットダウンするには:
gcloud dataproc clusters delete cluster-name \ --region=region
Cloud Storage の jar ファイルを削除するには:
gcloud storage rm gs://bucket-name/HelloWorld.jar
次のコマンドを使用すると、バケットおよびバケットのすべてのフォルダとファイルを削除できます。
gcloud storage rm gs://bucket-name/ --recursive