Dataproc で BigQuery metastore を使用する
このドキュメントでは、 Compute Engine 上の Dataproc で BigQuery metastore を使用する方法について説明します。この接続により、Apache Spark などのオープンソース ソフトウェア エンジン間で機能する単一の共有メタストアが提供されます。
始める前に
- Google Cloud プロジェクトに対する課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
BigQuery API と Dataproc API を有効にします。
省略可: BigQuery metastore の仕組みと使用すべき理由を理解します。
必要なロール
メタデータ ストアとして BigQuery metastore で Spark と Dataproc を使用するのに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。
-
Dataproc クラスタを作成します。プロジェクトの Compute Engine デフォルトのサービス アカウントに対する Dataproc ワーカー(
roles/dataproc.worker
) -
Spark で BigQuery metastore テーブルを作成する:
-
プロジェクトの Dataproc VM サービス アカウントに対する Dataproc ワーカー(
roles/dataproc.worker
) -
プロジェクトの Dataproc VM サービス アカウントに対する BigQuery データ編集者(
roles/bigquery.dataEditor
) -
プロジェクトの Dataproc VM サービス アカウントに対するStorage オブジェクト管理者(
roles/storage.objectAdmin
)
-
プロジェクトの Dataproc VM サービス アカウントに対する Dataproc ワーカー(
-
BigQuery で BigQuery metastore テーブルに対してクエリを実行する:
-
プロジェクトに対する BigQuery データ閲覧者(
roles/bigquery.dataViewer
) -
プロジェクトに対する BigQuery ユーザー(
roles/bigquery.user
) -
プロジェクトに対する Storage オブジェクト閲覧者(
roles/storage.objectViewer
)
-
プロジェクトに対する BigQuery データ閲覧者(
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
全般的なワークフロー
BigQuery metastore で Compute Engine 上の Dataproc を使用する一般的な手順は次のとおりです。
- Dataproc クラスタを作成するか、既存のクラスタを構成します。
- 任意のオープンソース ソフトウェア エンジン(Spark など)に接続します。
- JAR ファイルを使用して、Apache Iceberg カタログ プラグインをクラスタにインストールします。
- 使用しているオープンソース ソフトウェア エンジンに応じて、必要な BigQuery Metastore リソースを作成して管理します。
- BigQuery で BigQuery メタストア リソースにアクセスして使用します。
BigQuery metastore を Spark に接続する
次の手順では、インタラクティブな Spark SQL を使用して Dataproc を BigQuery metastore に接続します。
Iceberg カタログ プラグインをダウンロードする
BigQuery metastore を Dataproc と Spark に接続するには、BigQuery metastore Iceberg カタログ プラグイン jar ファイルを使用する必要があります。
このファイルは、Dataproc イメージ バージョン 2.2 にデフォルトで含まれています。Dataproc クラスタがインターネットに直接アクセスできない場合は、プラグインをダウンロードして、Dataproc クラスタがアクセスできる Cloud Storage バケットにアップロードする必要があります。
BigQuery metastore Apache Iceberg カタログ プラグインをダウンロードします。
Dataproc クラスタを構成する
BigQuery metastore に接続する前に、Dataproc クラスタを設定する必要があります。
これを行うには、新しいクラスタを作成するか、既存のクラスタを使用します。その後、このクラスタを使用してインタラクティブな Spark SQL を実行し、BigQuery metastore リソースを管理します。
クラスタが作成されるリージョンのサブネットで、プライベート Google アクセス(PGA)を有効にする必要があります。デフォルトでは、2.2(デフォルト)以降のイメージ バージョンで作成された Dataproc クラスタ VM には、内部 IP アドレスのみがあります。クラスタ VM が Google API と通信できるようにするには、クラスタが作成されたリージョンの
default
(またはユーザー指定のネットワーク名)のネットワーク サブネットでプライベート Google アクセスを有効にします。このガイドの Zeppelin ウェブ インターフェースの例を実行する場合は、Zeppelin オプション コンポーネントが有効になっている Dataproc クラスタを使用または作成する必要があります。
新しいクラスタ
新しい Dataproc クラスタを作成するには、次の gcloud
dataproc clusters create
コマンドを実行します。この構成には、BigQuery metastore の使用に必要な設定が含まれています。
gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --optional-components=ZEPPELIN \ --enable-component-gateway \ --single-node
次のように置き換えます。
CLUSTER_NAME
: Dataproc クラスタの名前。PROJECT_ID
: クラスタを作成する Google Cloud プロジェクトの ID。LOCATION
: クラスタを作成する Google Cloud リージョン。
既存のクラスタ
既存のクラスタを構成するには、次の Iceberg Spark ランタイムをクラスタに追加します。
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2
ランタイムは、次のいずれかの方法で追加できます。
初期化スクリプト。作成時に実行されるカスタム初期化スクリプトにランタイム依存関係を追加します。
スクリプトにランタイム依存関係を追加したら、手順に沿ってクラスタの作成、再作成、更新を行います。
手動インストール。Iceberg カタログ プラグイン JAR ファイルを手動で追加し、クラスタにランタイムが含まれるように Spark プロパティを構成します。
Spark ジョブを送信する
Spark ジョブを送信するには、次のいずれかの方法を使用します。
gcloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region==REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
次のように置き換えます。
PROJECT_ID
: Dataproc クラスタを含む Google Cloud プロジェクトの ID。CLUSTER_NAME
: Spark SQL ジョブの実行に使用している Dataproc クラスタの名前。REGION
: クラスタが配置されている Compute Engine リージョン。LOCATION
: BigQuery リソースのロケーション。CATALOG_NAME
: SQL ジョブで使用している Spark カタログの名前。WAREHOUSE_DIRECTORY
: データ ウェアハウスを含む Cloud Storage フォルダ。この値はgs://
で始まります。SPARK_SQL_COMMAND
: 実行する Spark SQL クエリ。このクエリには、リソースを作成するコマンドを含めます。たとえば、名前空間とテーブルを作成します。
インタラクティブ Spark
Spark に接続してカタログ プラグインをインストールする
BigQuery metastore のカタログ プラグインをインストールするには、SSH を使用して Dataproc クラスタに接続します。
- Google Cloud コンソールで、[VM インスタンス] ページに移動します。
Dataproc VM インスタンスに接続するには、仮想マシン インスタンスのリストで [SSH] をクリックします。出力は次のようになります。
Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
ターミナルで、次の BigQuery metastore の初期化コマンドを実行します。
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
次のように置き換えます。
CATALOG_NAME
: SQL ジョブで使用している Spark カタログの名前。PROJECT_ID
: Spark カタログがリンクされる BigQuery metastore カタログの Google Cloud プロジェクト ID。LOCATION
: BigQuery metastore の Google Cloud ロケーション。WAREHOUSE_DIRECTORY
: データ ウェアハウスを含む Cloud Storage フォルダ。この値はgs://
で始まります。
クラスタに正常に接続すると、Spark ターミナルに「
spark-sql
」というプロンプトが表示されます。spark-sql (default)>
BigQuery metastore リソースを管理する
これで BigQuery metastore に接続されました。BigQuery metastore に保存されているメタデータに基づいて、既存のリソースを表示したり、新しいリソースを作成したりできます。
たとえば、インタラクティブ Spark SQL セッションで次のコマンドを実行して、Iceberg 名前空間とテーブルを作成します。
カスタムの Iceberg カタログを使用する:
USE `CATALOG_NAME`;
名前空間を作成する:
CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
作成した名前空間を使用する:
USE NAMESPACE_NAME;
Iceberg テーブルを作成する:
CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
テーブルに行を挿入する:
INSERT INTO TABLE_NAME VALUES (1, "first row");
テーブルの列を追加する:
ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
テーブルのメタデータを表示する:
DESCRIBE EXTENDED TABLE_NAME;
名前空間内のテーブルの一覧を取得する:
SHOW TABLES;
Zeppelin ノートブック
Google Cloud コンソールで、Dataproc の [クラスタ] ページに移動します。
使用するクラスタの名前をクリックします。
[クラスタの詳細] ページが開きます。
ナビゲーション メニューで [ウェブ インターフェース] をクリックします。
[コンポーネント ゲートウェイ] で [Zeppelin] をクリックします。Zeppelin ノートブック ページが開きます。
ナビゲーション メニューで [Notebook]、[+ Create new note] の順にクリックします。
ダイアログでノートブック名を入力します。デフォルトのインタープリタとして [Spark] が選択されたままにします。
[Create] をクリックします。新しいノートブックが作成されます。
ノートブックで設定メニューをクリックし、[Interpreter] をクリックします。
[Search interpreters] フィールドで「Spark」を検索します。
[Edit] をクリックします。
[Spark.jars] フィールドに、Spark JAR の URI を入力します。
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar
[Save] をクリックします。
[OK] をクリックします。
次の PySpark コードを Zeppelin ノートブックにコピーします。
%pyspark from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("select version()").show() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME;").show()
次のように置き換えます。
CATALOG_NAME
: SQL ジョブに使用する Spark カタログの名前。PROJECT_ID
: Dataproc クラスタを含む Google Cloud プロジェクトの ID。WAREHOUSE_DIRECTORY
: データ ウェアハウスを含む Cloud Storage フォルダ。この値はgs://
で始まります。NAMESPACE_NAME
: Spark テーブルを参照する名前空間名。WAREHOUSE_DIRECTORY
: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。TABLE_NAME
: Spark テーブルのテーブル名。
実行アイコンをクリックするか、
Shift-Enter
キーを押してコードを実行します。ジョブが完了すると、「Spark Job Finished」というステータス メッセージが表示され、出力にテーブルの内容が表示されます。