Dataproc で Spark Scala ジョブを作成して実行する

このチュートリアルでは、Spark Scala ジョブを作成し、Dataproc クラスタに送信するさまざまな方法について説明します。ここで説明する方法は次のとおりです。

  • Scala REPL(Read-Evaluate-Print-Loop またはインタラクティブ インタープリタ)または SBT ビルドツールを使用して、コマンドラインから、ローカルマシンで Spark Scala の「Hello World」アプリを作成しコンパイルする
  • コンパイルした Scala クラスをマニフェストとともに jar ファイルにコンパイルする
  • Dataproc クラスタで実行される Spark ジョブに Scala jar を送信する
  • Google Cloud Console からの Scala ジョブ出力を確認する

このチュートリアルでは、次の方法についても説明します。

  • Dataproc クラスタで、spark-shell REPL を使用して Spark Scala の「WordCount」MapReduce ジョブを直接作成し実行する

  • プレインストールされた Apache Spark と Hadoop のサンプルをクラスタで実行する

Google Cloud Platform プロジェクトの設定

次の手順をまだ行っていない場合は、行います。

  1. プロジェクトを設定する
  2. Cloud Storage バケットを作成する
  3. Dataproc クラスタを作成する

ローカルでの Scala コードの作成とコンパイル

このチュートリアルの簡単な演習として、Scala REPL または SBT コマンドライン インターフェースを使用し、ローカルで「Hello World」Scala アプリを作成します。

Scala を使用する

  1. Scala のインストール ページから Scala バイナリをダウンロードします。
  2. Scala のインストールの手順に沿って、ファイルを解凍し、SCALA_HOME 環境変数を設定して、パスに追加します。次に例を示します。

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. Scala REPL を起動します。

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. HelloWorld コードをコピーして Scala REPL に貼り付けます。

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }

  5. HelloWorld.scala を保存して REPL を終了します。

    scala> :save HelloWorld.scala
    scala> :q
    

  6. scalac をコンパイルします。

    $ scalac HelloWorld.scala
    

  7. コンパイル済みの .class ファイルを一覧表示します。

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

SBT を使用する

  1. SBT をダウンロードします。

  2. 次のように、「HelloWorld」プロジェクトを作成します

    $ mkdir hello
    $ cd hello
    $ echo \
    'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
    HelloWorld.scala
    

  3. sbt.build 構成ファイルを作成して「HelloWorld.jar」に artifactName(生成した jar ファイル名は下記に表示)を設定します(デフォルトのアーティファクトの変更をご覧ください)。

    echo \
    'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    "HelloWorld.jar" }' > \
    build.sbt
    

  4. SBT を起動し、コードを実行します。

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. コードをメインクラスのエントリ ポイントを指定するマニフェストHelloWorld)とともに jar ファイルにパッケージ化し、終了します。

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

jar の作成

SBT または jar コマンドを使用して jar ファイルを作成します。

SBT で jar を作成する

SBT の package コマンドで jar ファイルを作成します(SBT を使用するをご覧ください)。

手動で jar を作成する

  1. コンパイル済みの HelloWorld*.class ファイルを含むディレクトリにディレクトリを変更(cd)し、次のコマンドを実行して、メインクラスのエントリ ポイント(HelloWorld)を指定するマニフェストとともにクラスファイルを jar ファイルにパッケージ化します。
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

Cloud Storage への jar のコピー

  1. Google Cloud CLI を使用して、jar をプロジェクトの Cloud Storage バケットにコピーします。
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

Dataproc Spark ジョブに jar を送信する

  1. Google Cloud コンソール を使用して、jar ファイルを Dataproc Spark ジョブに送信します。[ジョブの送信] ページの項目に次のように入力します。

    • [クラスタ]: クラスタリストからクラスタの名前を選択します。
    • [ジョブタイプ]: Spark
    • [メインクラスまたは JAR]: HelloWorld jar への Cloud Storage URI パスを指定します(gs://your-bucket-name/HelloWorld.jar)。

      コードへのエントリ ポイント(「Main-Class: HelloWorld」)を指定するマニフェストが jar に含まれていない場合は、[メインクラスまたは JAR] 項目にメインクラスの名前(「HelloWorld」)を指定し、[JAR ファイル] 項目に jar ファイルへの URI パス(gs://your-bucket-name/HelloWorld.jar)を入力する必要があります。

  2. [送信] をクリックしてジョブを開始します。開始されたジョブが [ジョブ] リストに追加されます。

  3. ジョブ ID をクリックして [ジョブ] ページを開くと、ジョブのドライバ出力が表示されます。

クラスタの spark-shell REPL を使用した Spark Scala コードの作成と実行

Scala アプリは Dataproc クラスタで直接開発することをおすすめします。Hadoop と Spark は Dataproc クラスタにプレインストールされており、Cloud Storage コネクタを使用して構成されています。これにより、Cloud Storage との間で直接データの読み取りや書き込みができます。

この例では、プロジェクトの Dataproc クラスタ マスターノードに SSH 接続し、spark-shell REPL を使用して Scala wordcount mapreduce アプリケーションを作成し実行する方法を示します。

  1. Dataproc クラスタのマスターノードに SSH で接続します。

    1. Google Cloud コンソールでプロジェクトの Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。

    2. クラスタの詳細ページで [VM インスタンス] タブを選択し、クラスタの名前がある行の右側に表示される [SSH] 選択ボタンをクリックします。

      マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。

  2. spark-shell を起動します。

    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    

  3. 一般に公開されている Cloud Storage にあるシェイクスピア テキスト スニペットから RDD(耐障害性分散データセット)を作成する

    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    

  4. テキストで wordcount mapreduce を実行し、wordcounts の結果を表示します。

    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
    (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    

  5. Cloud Storage の <bucket-name>/wordcounts-out のカウントを保存し、scala-shell を終了します。

    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    

  6. gcloud CLI を使用して出力ファイルを一覧表示し、ファイルの内容を表示する

    $ gcloud storage ls gs://bucket-name/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    

  7. gs://<bucket-name>/wordcounts-out/part-00000 の内容を確認します。

    $ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

プレインストールされたサンプルコードの実行

Dataproc のマスターノードには、実行可能な jar ファイルが標準の Apache Hadoop と Spark のサンプルとともに含まれています。

Jar タイプ Master node /usr/lib/ location GitHub のソース Apache ドキュメント
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar ソースリンク MapReduce のチュートリアル
Spark spark/lib/spark-examples.jar ソースリンク Spark のサンプル

クラスタへのコマンドラインからのサンプルの送信

Google Cloud CLI gcloud コマンドライン ツールを使用して、ローカル開発マシンからサンプルを送信できます。(Google Cloud コンソールからジョブを送信するには、Google Cloud コンソールの使用をご覧ください)。

Hadoop WordCount のサンプル

gcloud dataproc jobs submit hadoop --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
    --class=org.apache.hadoop.examples.WordCount \
    -- URI of input file URI of output file

Spark WordCount サンプル

gcloud dataproc jobs submit spark --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.JavaWordCount \
    -- URI of input file

クラスタのシャットダウン

課金され続けることがないようにするために、クラスタをシャットダウンし、このチュートリアルで使用した Cloud Storage リソース(Cloud Storage バケットとファイル)を削除します。

クラスタをシャットダウンするには:

gcloud dataproc clusters delete cluster-name \
    --region=region

Cloud Storage の jar ファイルを削除するには:

gcloud storage rm gs://bucket-name/HelloWorld.jar

次のコマンドを使用すると、バケットおよびバケットのすべてのフォルダとファイルを削除できます。

gcloud storage rm gs://bucket-name/ --recursive

次のステップ