Cloud Storage コネクタ

Cloud Storage コネクタのオープンソースの Java ライブラリを使用すると、Cloud Storage 内のデータに対して Apache Hadoop ジョブまたは Apache Spark ジョブを直接実行できます。

Cloud Storage コネクタのメリット

  • データへの直接アクセス: データを Cloud Storage に保存し、直接アクセスします。最初にデータを HDFS に転送する必要はありません。
  • HDFS との互換性: hdfs:// 接頭辞の代わりに gs:// を使用して、Cloud Storage 内のデータにアクセスできます。
  • 相互運用性: Cloud Storage にデータを保存することにより、Spark、Hadoop、Google サービス間でシームレスな相互運用性を実現できます。
  • データ アクセシビリティ: HDFS と異なり、Hadoop クラスタをシャットダウンしても、Cloud Storage 内のデータに引き続きアクセスできます。
  • データの高可用性: Cloud Storage に保存されているデータの可用性は高く、グローバルに複製してもパフォーマンスが低下しません。
  • ストレージ管理のオーバーヘッドが不要: HDFS と異なり、Cloud Storage はファイル システムのチェック、ファイル システムのアップグレードや旧バージョンへのロールバックなどの定期的なメンテナンスを必要としません。
  • クイック起動: HDFS では、NameNode がセーフモードでなくなるまで MapReduce ジョブを起動できません。この処理は、データのサイズや状態に応じて、数秒で終わるときもあれば、何分もかかるときもあります。Cloud Storage を使用することで、タスクノードの開始と同時にジョブを開始することができ、長期的にも大幅なコスト節減につながります。

Dataproc クラスタでのコネクタの設定

Cloud Storage コネクタは、デフォルトで /usr/local/share/google/dataproc/lib/ ディレクトリのすべての Dataproc クラスタノードにインストールされます。次のサブセクションでは、Dataproc クラスタでコネクタの設定を完了する手順について説明します。

VM サービス アカウント

Dataproc クラスタノードや他の Compute Engine VM でコネクタを実行すると、google.cloud.auth.service.account.enable プロパティはデフォルトで false に設定されます。つまり、コネクタの VM サービス アカウントの認証情報を構成する必要はありません。VM サービス アカウントの認証情報は VM メタデータ サーバーから提供されます。

Dataproc の VM サービス アカウントには、Cloud Storage バケットにアクセスするための権限が必要です。

ユーザーが選択したコネクタ バージョン

Dataproc クラスタにインストールされている最新のイメージで使用されるデフォルトの Cloud Storage コネクタのバージョンは、イメージ バージョン ページにリストされています。アプリケーションが、クラスタにデプロイされた、デフォルト以外のコネクタ バージョンに依存している場合は、次のいずれかの操作を実行して、選択したコネクタ バージョンを使用できます。

  • --metadata=GCS_CONNECTOR_VERSION=x.y.z フラグを使用してクラスタを作成します。このフラグは、クラスタ上で実行されているアプリケーションが使用するコネクタを、指定されたコネクタ バージョンに更新します。
  • 使用しているバージョンのコネクタ クラスとコネクタの依存関係をアプリケーションの jar に含めて再配置します。デプロイされたコネクタ バージョンと Dataproc クラスタにインストールされているデフォルトのコネクタ バージョンとの競合を回避するために、再配置が必要です。Maven 依存関係の再配置例もご覧ください。

Dataproc 以外のクラスタでのコネクタの設定

次の手順で、オンプレミスの HDFS データを Cloud Storage に移動するために使用する Apache Hadoop クラスタや Spark クラスタなど、Dataproc 以外のクラスタに Cloud Storage コネクタを設定できます。

  1. コネクタをダウンロードします。

  2. コネクタをインストールします。

    GitHub の手順に沿って、Cloud Storage コネクタをインストール、構成、テストします。

コネクタの使用

このコネクタを使用して、次の方法で Cloud Storage データにアクセスできます。

  • gs:// 接頭辞を使用する Spark、PySpark、Hadoop アプリケーション
  • hadoop fs -ls gs://bucket/dir/file を使用して Hadoop シェルで
  • Cloud Storage のブラウザで、Google Cloud コンソールで
  • Google Cloud SDK コマンドを使用する。たとえば、次のコマンドを使用します。 * gcloud storage cp * gcloud storage rsync

Java の使用

Cloud Storage コネクタには Java 8 が必要です。

次の例は、Cloud Storage コネクタの Maven POM 依存関係管理セクションです。詳細については、依存関係管理をご覧ください。

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>gcs-connector</artifactId>
    <version>hadoopX-X.X.XCONNECTOR VERSION</version>
    <scope>provided</scope>
</dependency>

シェーディングされたバージョンの場合:

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>gcs-connector</artifactId>
    <version>hadoopX-X.X.XCONNECTOR VERSION</version>
    <scope>provided</scope>
    <classifier>shaded</classifier>
</dependency>

コネクタのサポート

Google Cloud プロダクトやユースケースで使用される Cloud Storage コネクタは、Google Cloud でサポートされます。Dataproc で使用する場合、Dataproc と同じレベルでサポートされます。詳細については、サポートを利用するをご覧ください。

gRPC を使用して Cloud Storage に接続する

デフォルトでは、Dataproc の Cloud Storage コネクタは Cloud Storage JSON API を使用します。このセクションでは、Cloud Storage コネクタで gRPC を使用するように設定する方法について説明します。

使用に際しての考慮事項

gRPC で Cloud Storage コネクタを使用する場合は、次の点を考慮してください。

  • リージョン バケットのロケーション: gRPC で読み取りレイテンシを短縮できるのは、Compute Engine VM と Cloud Storage バケットが同じ Compute Engine リージョンにある場合のみです。
  • 読み取り負荷の高いジョブ: gRPC は、長時間実行される読み取りの読み取りレイテンシを改善し、読み取り負荷の高いワークロードに役立ちます。gRPC チャネルを作成して短い計算を実行し、チャネルを閉じるアプリケーションはおすすめしません。
  • 未認証のリクエスト: gRPC は未認証のリクエストをサポートしていません。

要件

Cloud Storage コネクタで gRPC を使用する場合は、次の要件が適用されます。

  • Dataproc クラスタの VPC ネットワークは、直接接続をサポートしている必要があります。つまり、ネットワークのルートファイアウォール ルールで、下り(外向き)トラフィックが 34.126.0.0/182001:4860:8040::/42 に到達できるようにする必要があります。

  • Dataproc クラスタを作成する場合は、イメージ バージョン 2.1.56+ で Cloud Storage コネクタ バージョン 2.2.23 以降、またはイメージ バージョン 2.2.0 以降で Cloud Storage コネクタ バージョン v3.0.0 以降を使用する必要があります。各 Dataproc イメージ バージョンにインストールされている Cloud Storage コネクタのバージョンは、Dataproc イメージ バージョンのページに記載されています。

    • gRPC Cloud Storage リクエストに Dataproc on GKE 仮想クラスタを作成して使用する場合は、gke-metadata-server 0.4.285 を含む GKE バージョン 1.28.5-gke.1199000 をおすすめします。この組み合わせは直接接続をサポートしています。
  • ユーザーまたは組織の管理者は、Cloud Storage コネクタへの gRPC リクエストの設定と送信に必要な権限を含む Identity and Access Management ロールを付与する必要があります。これらのロールには次のものがあります。

    • ユーザーロール: クラスタの作成とジョブの送信を許可する Dataproc 編集者ロールがユーザーに付与されます。
    • サービス アカウントのロール: クラスタ VM で実行されるアプリケーションが Cloud Storage オブジェクトの表示、読み取り、作成、書き込みを行えるように、Dataproc の VM サービス アカウントに付与される Storage オブジェクト ユーザーロール。

Cloud Storage コネクタで gRPC を有効にする

Cloud Storage コネクタで gRPC を有効にするには、クラスタまたはジョブレベルで有効にします。クラスタで有効にすると、Cloud Storage コネクタの読み取りリクエストで gRPC が使用されます。クラスタレベルではなくジョブで有効にした場合、Cloud Storage コネクタの読み取りリクエストは、ジョブでのみ gRPC を使用します。

クラスタを有効にする

クラスタレベルで Cloud Storage コネクタで gRPC を有効にするには、Dataproc クラスタを作成するときに core:fs.gs.client.type=STORAGE_CLIENT プロパティを設定します。クラスタ レベルで gRPC が有効になると、クラスタで実行されているジョブによって行われた Cloud Storage コネクタの読み取りリクエストで gRPC が使用されます。

gcloud CLI の例:

gcloud dataproc clusters create CLUSTER_NAME \
    --project=PROJECT_ID \
    --region=REGION \
    --properties=core:fs.gs.client.type=STORAGE_CLIENT

次のように置き換えます。

  • CLUSTER_NAME: クラスタの名前を指定します。
  • PROJECT_NAME: クラスタが配置されているプロジェクトのプロジェクト ID。プロジェクト ID は、Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
  • REGION: クラスタが配置される Compute Engine リージョンを指定します。

ジョブを有効にする

特定のジョブの Cloud Storage コネクタで gRPC を有効にするには、ジョブを送信するときに --properties=spark.hadoop.fs.gs.client.type=STORAGE_CLIENT を含めます。

例: gRPC を使用して Cloud Storage から読み取る既存のクラスタでジョブを実行します。

  1. gRPC を使用して Cloud Storage テキスト ファイルを読み取り、ファイル内の行数を出力するローカルの /tmp/line-count.py PySpark スクリプトを作成します。

    cat <<EOF >"/tmp/line-count.py"
    #!/usr/bin/python
    import sys
    from pyspark.sql import SparkSession
    path = sys.argv[1]
    spark = SparkSession.builder.getOrCreate()
    rdd = spark.read.text(path)
    lines_counter = rdd.count()
    print("There are {} lines in file: {}".format(lines_counter,path))
    EOF
    
  2. ローカルの /tmp/line-count-sample.txt テキスト ファイルを作成します。

    cat <<EOF >"/tmp/line-count-sample.txt"
    Line 1
    Line 2
    line 3
    EOF
    
  3. ローカルの /tmp/line-count.py/tmp/line-count-sample.txt を Cloud Storage のバケットにアップロードします。

    gcloud storage cp /tmp/line-count* gs://BUCKET
    
  4. クラスタで line-count.py ジョブを実行します。Cloud Storage コネクタの読み取りリクエストで gRPC を有効にするには、--properties=spark.hadoop.fs.gs.client.type=STORAGE_CLIENT を設定します。

    gcloud dataproc jobs submit pyspark gs://BUCKET/line-count.py \
    --cluster=CLUSTER_NAME \
    --project=PROJECT_ID  \
    --region=REGION \
    --properties=spark.hadoop.fs.gs.client.type=STORAGE_CLIENT \
    -- gs://BUCKET/line-count-sample.txt
    

    次のように置き換えます。

    • CLUSTER_NAME: 既存のクラスタの名前。
    • PROJECT_NAME: プロジェクト ID。プロジェクト ID は、Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
    • REGION: クラスタが配置されている Compute Engine リージョン。
    • BUCKET: Cloud Storage バケット。

gRPC クライアントサイドの指標を生成する

Cloud Storage コネクタを構成して、Cloud Monitoring で gRPC 関連の指標を生成できます。gRPC 関連の指標は、次の作業に役立ちます。

  • Cloud Storage への gRPC リクエストのパフォーマンスをモニタリングして最適化する
  • 問題のトラブルシューティングとデバッグ
  • アプリの使用状況と動作に関する分析情報を取得する

gRPC 関連の指標を生成するように Cloud Storage コネクタを構成する方法については、gRPC クライアントサイド指標を使用するをご覧ください。

関連情報