Cloud Storage コネクタ

Cloud Storage コネクタのオープンソースの Java ライブラリを使用すると、Cloud Storage のデータに対して Apache Hadoop ジョブまたは Apache Spark ジョブを直接実行できます。

Cloud Storage コネクタのメリット

  • データへの直接アクセス: データを Cloud Storage に保存し、直接アクセスします。最初にデータを HDFS に転送する必要はありません。
  • HDFS との互換性: hdfs:// 接頭辞の代わりに gs:// を使用して、Cloud Storage 内のデータにアクセスできます。
  • 相互運用性: Cloud Storage にデータを保存することにより、Spark、Hadoop、Google サービス間でシームレスな相互運用性を実現できます。
  • データ アクセシビリティ: HDFS と異なり、Hadoop クラスタをシャットダウンしても、Cloud Storage 内のデータに引き続きアクセスできます。
  • データの高可用性: Cloud Storage に保存されているデータの可用性は高く、グローバルに複製してもパフォーマンスが低下しません。
  • ストレージ管理のオーバーヘッドが不要: HDFS と異なり、Cloud Storage はファイル システムのチェック、ファイル システムのアップグレードや旧バージョンへのロールバックなどの定期的なメンテナンスを必要としません。
  • クイック起動: HDFS では、NameNode がセーフモードでなくなるまで MapReduce ジョブを起動できません。この処理は、データのサイズや状態に応じて、数秒で終わるときもあれば、何分もかかるときもあります。Cloud Storage を使用することで、タスクノードの開始と同時にジョブを開始することができ、長期的にも大幅なコスト節減につながります。

Dataproc クラスタでのコネクタの設定

Cloud Storage コネクタは、デフォルトで /usr/local/share/google/dataproc/lib/ ディレクトリのすべての Dataproc クラスタノードにインストールされます。以降のサブセクションでは、Dataproc クラスタでコネクタの設定を完了する手順について説明します。

VM サービス アカウント

Dataproc クラスタノードや他の Compute Engine VM でコネクタを実行すると、google.cloud.auth.service.account.enable プロパティはデフォルトで false に設定されます。したがって、コネクタの VM サービス アカウントの認証情報を構成する必要はありません。VM サービス アカウントの認証情報は VM メタデータ サーバーから提供されます。

Dataproc VM サービス アカウントには、Cloud Storage バケットにアクセスする権限が必要です。

ユーザーが選択したコネクタ バージョン

Dataproc クラスタにインストールされている最新のイメージで使用されるデフォルトの Cloud Storage コネクタのバージョンは、イメージ バージョン ページにリストされています。アプリケーションが、クラスタにデプロイされた、デフォルト以外のコネクタ バージョンに依存している場合は、次のいずれかを行って、選択したコネクタ バージョンを使用できます。

  • --metadata=GCS_CONNECTOR_VERSION=x.y.z フラグを使用してクラスタを作成します。このフラグは、クラスタ上で実行されているアプリケーションが使用するコネクタを、指定されたコネクタ バージョンに更新します。
  • 使用しているバージョンのコネクタ クラスとコネクタの依存関係をアプリケーションの jar に含めて再配置します。再配置は、デプロイされたコネクタのバージョンと Dataproc クラスタにインストールされているデフォルトのコネクタのバージョンとの競合を回避するために必要です。Maven 依存関係の再配置の例もご覧ください。

Dataproc 以外のクラスタでのコネクタの設定

オンプレミスの HDFS データを Cloud Storage に移動するために使用する Apache Hadoop クラスタや Spark クラスタなど、Dataproc 以外のクラスタに Cloud Storage コネクタを設定するには、次の手順を行います。

  1. コネクタをダウンロードします。

  2. コネクタをインストールします。

    GitHub の手順に沿って、Cloud Storage コネクタをインストール、構成、テストします。

コネクタの使用

このコネクタを使用して、次の方法で Cloud Storage データにアクセスできます。

  • gs:// 接頭辞を使用する Spark、PySpark、Hadoop アプリケーション
  • hadoop fs -ls gs://bucket/dir/file を使用した Hadoop シェルで
  • Cloud Storage のブラウザ Google Cloud コンソールで
  • Google Cloud SDK コマンド(次のようなコマンド)を使用します。

Java の使用状況

Cloud Storage コネクタには Java 8 が必要です。

Cloud Storage コネクタの Maven POM 依存関係管理セクションのサンプルを次に示します。詳細については、依存関係の管理をご覧ください。

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>gcs-connector</artifactId>
    <version>hadoopX-X.X.XCONNECTOR VERSION</version>
    <scope>provided</scope>
</dependency>

シェーディングされたバージョン:

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>gcs-connector</artifactId>
    <version>hadoopX-X.X.XCONNECTOR VERSION</version>
    <scope>provided</scope>
    <classifier>shaded</classifier>
</dependency>

コネクタのサポート

Cloud Storage コネクタは、Google Cloud プロダクトとユースケースで使用するために Google Cloud でサポートされています。Dataproc と併用すると、Dataproc と同じレベルでサポートされます。詳細については、サポートを利用するをご覧ください。

gRPC を使用して Cloud Storage に接続する

デフォルトでは、Dataproc の Cloud Storage コネクタは Cloud Storage JSON API を使用します。このセクションでは、Cloud Storage コネクタで gRPC を使用できるようにする方法について説明します。

使用に際しての考慮事項

gRPC で Cloud Storage コネクタを使用する際の考慮事項は次のとおりです。

  • リージョン バケットのロケーション: gRPC は、Compute Engine VM と Cloud Storage バケットが同じ Compute Engine リージョンにある場合にのみ、読み取りレイテンシを短縮できます。
  • 読み取り負荷の高いジョブ: gRPC は、長時間実行される読み取りの読み取りレイテンシを改善し、読み取り負荷の高いワークロードに役立ちます。gRPC チャネルを作成し、短い計算を実行してからチャネルを閉じるアプリケーションにはおすすめしません。
  • 認証されていないリクエスト: gRPC は認証されていないリクエストをサポートしていません。

要件

Cloud Storage コネクタで gRPC を使用する場合は、次の要件が適用されます。

  • Dataproc クラスタの VPC ネットワークは、直接接続をサポートしている必要があります。つまり、ネットワークのルートファイアウォール ルールで、34.126.0.0/182001:4860:8040::/42 にアクセスする下り(外向き)トラフィックを許可する必要があります。

  • Dataproc クラスタを作成する場合は、イメージ バージョン 2.1.56+ で Cloud Storage コネクタ バージョン 2.2.23 以降を使用するか、イメージ バージョン 2.2.0 以降で Cloud Storage コネクタ バージョン v3.0.0 以降を使用する必要があります。各 Dataproc イメージ バージョンにインストールされている Cloud Storage コネクタ バージョンは、Dataproc イメージ バージョンのページに記載されています。

    • gRPC Cloud Storage リクエストに Dataproc on GKE 仮想クラスタを作成して使用する場合は、gke-metadata-server 0.4.285 を含む GKE バージョン 1.28.5-gke.1199000 をおすすめします。この組み合わせは直接接続をサポートしています。
  • ユーザーまたは組織の管理者は、Cloud Storage コネクタを設定して gRPC リクエストを行うために必要な権限を含む Identity and Access Management ロールを付与する必要があります。これらのロールには、次のものがあります。

    • ユーザーロール: ユーザーに付与される Dataproc 編集者ロール。ユーザーがクラスタを作成してジョブを送信できるようにします。
    • サービス アカウントのロール: クラスタ VM で実行されているアプリケーションが Cloud Storage オブジェクトを表示、読み取り、作成、書き込みできるように、Dataproc VM サービス アカウントに付与される Storage オブジェクト ユーザーのロール。

Cloud Storage コネクタで gRPC を有効にする

Cloud Storage コネクタで gRPC を有効にするには、クラスタレベルまたはジョブレベルで設定します。クラスタで有効にすると、Cloud Storage コネクタの読み取りリクエストで gRPC が使用されます。クラスタレベルではなくジョブで有効にした場合、Cloud Storage コネクタの読み取りリクエストは、ジョブでのみ gRPC を使用します。

クラスタを有効にする

クラスタレベルで Cloud Storage コネクタで gRPC を有効にするには、Dataproc クラスタを作成するときに core:fs.gs.client.type=STORAGE_CLIENT プロパティを設定します。クラスタ レベルで gRPC が有効になると、クラスタで実行されているジョブによって行われた Cloud Storage コネクタの読み取りリクエストは gRPC を使用します。

gcloud CLI の例:

gcloud dataproc clusters create CLUSTER_NAME \
    --project=PROJECT_ID \
    --region=REGION \
    --properties=core:fs.gs.client.type=STORAGE_CLIENT

次のように置き換えます。

  • CLUSTER_NAME: クラスタの名前を指定します。
  • PROJECT_NAME: クラスタが配置されているプロジェクトのプロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
  • REGION: クラスタが配置される Compute Engine リージョンを指定します。

ジョブを有効にする

特定のジョブの Cloud Storage コネクタで gRPC を有効にするには、ジョブを送信するときに --properties=spark.hadoop.fs.gs.client.type=STORAGE_CLIENT を含めます。

例: gRPC を使用して Cloud Storage から読み取る既存のクラスタでジョブを実行します。

  1. gRPC を使用して Cloud Storage テキスト ファイルを読み取り、ファイル内の行数を出力するローカル /tmp/line-count.py PySpark スクリプトを作成します。

    cat <<EOF >"/tmp/line-count.py"
    #!/usr/bin/python
    import sys
    from pyspark.sql import SparkSession
    path = sys.argv[1]
    spark = SparkSession.builder.getOrCreate()
    rdd = spark.read.text(path)
    lines_counter = rdd.count()
    print("There are {} lines in file: {}".format(lines_counter,path))
    EOF
    
  2. ローカルの /tmp/line-count-sample.txt テキスト ファイルを作成します。

    cat <<EOF >"/tmp/line-count-sample.txt"
    Line 1
    Line 2
    line 3
    EOF
    
  3. ローカルの /tmp/line-count.py/tmp/line-count-sample.txt を Cloud Storage のバケットにアップロードします。

    gcloud storage cp /tmp/line-count* gs://BUCKET
    
  4. クラスタで line-count.py ジョブを実行します。--properties=spark.hadoop.fs.gs.client.type=STORAGE_CLIENT を設定して、Cloud Storage コネクタの読み取りリクエストで gRPC を有効にします。

    gcloud dataproc jobs submit pyspark gs://BUCKET/line-count.py \
    --cluster=CLUSTER_NAME \
    --project=PROJECT_ID  \
    --region=REGION \
    --properties=spark.hadoop.fs.gs.client.type=STORAGE_CLIENT \
    -- gs://BUCKET/line-count-sample.txt
    

    次のように置き換えます。

    • CLUSTER_NAME: 既存のクラスタの名前。
    • PROJECT_NAME: プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
    • REGION: クラスタが配置されている Compute Engine リージョン。
    • BUCKET: Cloud Storage バケット。

gRPC クライアントサイドの指標を生成する

Cloud Monitoring で gRPC 関連の指標を生成するように Cloud Storage コネクタを構成できます。gRPC 関連の指標は、次の作業に役立ちます。

  • Cloud Storage への gRPC リクエストのパフォーマンスをモニタリングして最適化する
  • 問題のトラブルシューティングとデバッグを行う
  • アプリケーションの使用状況と動作に関する分析情報を取得する

gRPC 関連の指標を生成するように Cloud Storage コネクタを構成する方法については、gRPC クライアントサイドの指標を使用するをご覧ください。

リソース