Apache Spark バッチ ワークロードを実行する

Dataproc Serverless を使用して、必要に応じてリソースをスケールする Dataproc マネージド コンピューティング インフラストラクチャのバッチ ワークロードを送信する方法を学習します。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc API.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc API.

    Enable the API

  8. ワークロードを実行するリージョン VPC サブネットで限定公開の Google アクセスが有効になっていることを確認します。詳細については、Spark バッチ ワークロードの送信をご覧ください。

Spark バッチ ワークロードの送信

Google Cloud コンソール、Google Cloud CLI、または Dataproc Serverless API を使用して、Dataproc Serverless for Spark バッチ ワークロードを作成して送信できます。

Console

  1. Google Cloud コンソールで、[Dataproc バッチ] に移動します。

  2. [作成] をクリックします。

  3. 次のフィールドを選択して入力し、pi の近似値を計算する Spark バッチ ワークロードを送信します。

    • バッチ情報:
      • バッチ ID: バッチ ワークロードの ID を指定します。この値は 4 ~ 63 文字にする必要があります。有効な文字は /[a-z][0-9]-/ です。
      • リージョン: ワークロードを実行するリージョンを選択します。
    • コンテナ:
      • バッチタイプ: Spark。
      • ランタイム バージョン: デフォルトのランタイム バージョンが選択されています。必要に応じて、デフォルト以外の Dataproc Serverless ランタイム バージョンを指定できます。
      • メインクラス:
        org.apache.spark.examples.SparkPi
      • jar ファイル(このファイルは Dataproc Serverless Spark 実行環境にプリインストールされています)。
        file:///usr/lib/spark/examples/jars/spark-examples.jar
      • 引数: 1000。
    • 実行構成: ワークロードの実行に使用するサービス アカウントを指定できます。サービス アカウントを指定しない場合、ワークロードは Compute Engine のデフォルトのサービス アカウントで実行されます。サービス アカウントには Dataproc ワーカーのロールが必要です。
    • ネットワーク構成: Dataproc Serverless for Spark ワークロードを実行する VPC サブネットワークは、限定公開の Google アクセス PGAが有効になっており、Dataproc Serverless for Spark のネットワーク構成にリストされているその他の要件を満たす必要があります。

      [プライマリ ネットワーク] セレクタと [サブネットワーク] セレクタには、選択したワークロード リージョンのサブネットで限定公開の Google アクセスが有効になっているネットワークが一覧表示されます。リストからネットワークとサブネットを選択します。ネットワークとサブネットが表示されない場合は、現在選択されているワークロード リージョンの VPC サブネットで限定公開の Google アクセスを有効にするか、ワークロード リージョンを PGA が有効になっているサブネットがリストされているリージョンに変更して、そのネットワークとサブネットを選択します。

    • プロパティ: Spark バッチ ワークロードに設定するサポートされている Spark プロパティKey(プロパティ名)と Value を入力します。注: Compute Engine のクラスタ プロパティの Dataproc とは異なり、Dataproc Serverless for Spark のワークロード プロパティに spark: 接頭辞が含まれていません。

    • その他のオプション:

  4. [送信] をクリックして、Spark バッチ ワークロードを実行します。

gcloud

Spark バッチ ワークロードを送信して pi の近似値を計算するには、次の gcloud CLI の gcloud dataproc batches submit spark コマンドをターミナル ウィンドウまたは Cloud Shell でローカルに実行します。

gcloud dataproc batches submit spark \
    --region=REGION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    -- 1000

注:

  • REGION: ワークロードが実行されるリージョンを指定します。
  • サブネットワーク: Dataproc Serverless for Spark ワークロードを実行する VPC サブネットワークは、限定公開の Google アクセスが有効になっており、Dataproc Serverless for Spark のネットワーク構成にリストされているその他の要件を満たす必要があります。 gcloud dataproc batches submit コマンドで指定されたリージョンの default ネットワークのサブネットが限定公開の Google アクセスで有効になっていない場合、次のいずれかを行う必要があります。
    • 限定公開の Google アクセスでリージョンのデフォルト ネットワークのサブネットを有効にする
    • 限定公開の Google アクセスが有効になっているサブネットを指定するには、--subnet=SUBNET_URI フラグを使用します。ネットワーク内のサブネットの URI を一覧表示するには、gcloud compute networks describe <var>NETWORK_NAME</var> コマンドを実行します。
  • --jars: サンプル JAR ファイルが Spark 実行環境にプリインストールされています。SparkPi ワークロードに渡される 1000 コマンド引数は、円周率の見積もりロジックを 1,000 回繰り返し指定します。(ワークロード入力引数は「--」の後に含まれます)
  • --properties: このフラグを追加して、Spark バッチ ワークロードで使用するサポートされている Spark プロパティを入力できます。
  • --deps-bucket: このフラグを追加して、Dataproc サーバーレスがワークロードの依存関係をアップロードする Cloud Storage バケットを指定できます。バケットの gs:// URI 接頭辞は必要ありません。「mybucketname」のように、バケットのパスまたはバケット名を指定できます。Dataproc Serverless for Spark は、バッチ ワークロードを実行する前に、ローカル ファイルをバケット内の /dependencies フォルダにアップロードします。注: バッチ ワークロードがローカルマシン上のファイルを参照する場合は、このフラグが必要です。
  • --ttl: --ttl フラグを追加して、バッチの有効期間の長さを指定できます。ワークロードがこの期間を超えると、進行中の作業の終了を待たずに無条件に終了します。smhd(秒、分、時間、日)の接尾辞を使用して期間を指定します。最小値は 10 分(10m)、最大値は 14 日(14d)です。
    • 1.1 または 2.0 ランタイム バッチ: 1.1 または 2.0 ランタイム バッチ ワークロードで --ttl が指定されていない場合、ワークロードは自然に終了するまで実行されます(終了しない場合、永続的に実行されます)。
    • 2.1 以降のランタイム バッチ: 2.1 以降のランタイム バッチ ワークロードで --ttl が指定されていない場合、デフォルトは 4h です。
  • --service-account: ワークロードの実行に使用するサービス アカウントを指定できます。サービス アカウントを指定しない場合、ワークロードは Compute Engine のデフォルトのサービス アカウントで実行されます。サービス アカウントには Dataproc ワーカーのロールが必要です。
  • その他のオプション: gcloud dataproc batches submit spark フラグを追加して、他のワークロード オプションと Spark プロパティを指定できます。
    • Hive メタストア: 次のコマンドは、標準の Spark 構成を使用する外部のセルフマネージド Hive メタストアを使用するようにバッチ ワークロードを構成します。
      gcloud dataproc batches submit spark\
          --properties=spark.sql.catalogImplementation=hive,spark.hive.metastore.uris=METASTORE_URI,spark.hive.metastore.warehouse.dir=WAREHOUSE_DIR> \
          other args ...
              
    • 永続的履歴サーバー:
      1. 次のコマンドは、単一ノードの Dataproc クラスタに PHS を作成します。PHS は、バッチ ワークロードを実行するリージョンに配置する必要があり、Cloud Storage bucket-name が存在している必要があります。
        gcloud dataproc clusters create PHS_CLUSTER_NAME \
            --region=REGION \
            --single-node \
            --enable-component-gateway \
            --properties=spark:spark.history.fs.logDirectory=gs://bucket-name/phs/*/spark-job-history
                     
      2. 実行中の永続履歴サーバーを指定してバッチ ワークロードを送信します。
        gcloud dataproc batches submit spark \
            --region=REGION \
            --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
            --class=org.apache.spark.examples.SparkPi \
            --history-server-cluster=projects/project-id/regions/region/clusters/PHS-cluster-name \
            -- 1000
                      
    • ランタイム バージョン: --version フラグを使用して、ワークロードの Dataproc サーバーレス ランタイム バージョンを指定します。
      gcloud dataproc batches submit spark \
          --region=REGION \
          --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
          --class=org.apache.spark.examples.SparkPi \
          --version=VERSION
          -- 1000
                  

API

このセクションでは、Dataproc Serverless for Spark の batches.create を使用して、pi の近似値を計算するバッチ ワークロードを作成する方法について説明します。

リクエストのデータを使用する前に、次のように置き換えます。

  • project-id: Google Cloud プロジェクト ID。
  • region: Dataproc Serverless がワークロードを実行する Compute Engine リージョン
  • 注:
    • RuntimeConfig.containerImage: Docker イメージの命名形式 {hostname}/{project-id}/{image}:{tag} を使用して、カスタム コンテナ イメージを指定できます(例: gcr.io/my-project-id/my-image:1.0.1)。注: カスタム コンテナは Container Registry でホストする必要があります。
    • ExecutionConfig.subnetworkUri: Dataproc Serverless for Spark ワークロードを実行する VPC サブネットワークは、限定公開の Google アクセスが有効になっており、Dataproc Serverless for Spark のネットワーク構成にリストされているその他の要件を満たす必要があります。 指定されたリージョンの default ネットワークのサブネットが限定公開の Google アクセスで有効になっていない場合、次のいずれかを行う必要があります。
      1. 限定公開の Google アクセスでリージョンのデフォルト ネットワークのサブネットを有効にする
      2. ExecutionConfig.subnetworkUri フィールドを使用して、限定公開の Google アクセスが有効になっているサブネットを指定します。ネットワーク内のサブネットの URI を一覧表示するには、gcloud compute networks describe [NETWORK_NAME] コマンドを実行します。
    • sparkBatch.jarFileUris: サンプルの jar ファイルは、Spark 実行環境にプリインストールされています。「1000」の sparkBatch.args が SparkPi ワークロードに渡されると、pi の見積もりロジックを 1000 回反復するように指定します。
    • RuntimeConfig.properties: このフィールドを使用して、Spark バッチ ワークロードで使用するサポートされている Spark プロパティを入力できます。
    • ExecutionConfig.serviceAccount: ワークロードの実行に使用するサービス アカウントを指定できます。サービス アカウントを指定しない場合、ワークロードは Compute Engine のデフォルトのサービス アカウントで実行されます。サービス アカウントには Dataproc ワーカーのロールが必要です。
    • EnvironmentConfig.ttl: このフィールドを使用して、バッチの有効期間の長さを指定できます。ワークロードがこの期間を超えると、進行中の作業の終了を待たずに無条件に終了します。[期間] で期間を JSON 表現として指定します。 最小値は 10 分、最大値は 14 日です。
      • 1.1 または 2.0 ランタイム バッチ: 1.1 または 2.0 ランタイム バッチ ワークロードで --ttl が指定されていない場合、ワークロードは自然に終了するまで実行されます(終了しない場合、永続的に実行されます)。
      • 2.1 以降のランタイム バッチ: 2.1 以降のランタイム バッチ ワークロードで --ttl が指定されていない場合、デフォルトは 4 時間です。
    • その他の方法:

    HTTP メソッドと URL:

    POST https://dataproc.googleapis.com/v1/projects/project-id/locations/region/batches

    リクエストの本文(JSON):

    {
      "sparkBatch":{
        "args":[
          "1000"
        ],
        "jarFileUris":[
          "file:///usr/lib/spark/examples/jars/spark-examples.jar"
        ],
        "mainClass":"org.apache.spark.examples.SparkPi"
      }
    }
    

    リクエストを送信するには、次のいずれかのオプションを展開します。

    次のような JSON レスポンスが返されます。

    {
    "name":"projects/project-id/locations/region/batches/batch-id",
      "uuid":",uuid",
      "createTime":"2021-07-22T17:03:46.393957Z",
      "sparkBatch":{
        "mainClass":"org.apache.spark.examples.SparkPi",
        "args":[
          "1000"
        ],
        "jarFileUris":[
          "file:///usr/lib/spark/examples/jars/spark-examples.jar"
        ]
      },
      "runtimeInfo":{
        "outputUri":"gs://dataproc-.../driveroutput"
      },
      "state":"SUCCEEDED",
      "stateTime":"2021-07-22T17:06:30.301789Z",
      "creator":"account-email-address",
      "runtimeConfig":{
        "properties":{
          "spark:spark.executor.instances":"2",
          "spark:spark.driver.cores":"2",
          "spark:spark.executor.cores":"2",
          "spark:spark.app.name":"projects/project-id/locations/region/batches/batch-id"
        }
      },
      "environmentConfig":{
        "peripheralsConfig":{
          "sparkHistoryServerConfig":{
          }
        }
      },
      "operation":"projects/project-id/regions/region/operation-id"
    }
    

ワークロードの費用を見積もる

Dataproc Serverless for Spark ワークロードは、データ コンピューティング ユニット(DCU)とシャッフル ストレージ リソースを消費します。Dataproc Serverless の料金で、Dataproc の UsageMetrics を出力してワークロード リソースの消費とコストを見積もる例をご紹介しています。

次のステップ

以下の内容について学習します。