Apache Spark を使用して Pub/Sub Lite メッセージを書き込む
Pub/Sub Lite Spark Connector は、Pub/Sub Lite の Apache Spark Structured Streaming の入出力ソースとしての使用をサポートするオープンソース Java クライアント ライブラリです。コネクタは、Dataproc を含むすべての Apache Spark ディストリビューションで機能します。
このクイックスタートでは、次の方法について説明します。
- Pub/Sub Lite からメッセージを読み取る
- Pub/Sub Lite にメッセージを書き込む
Dataproc Spark クラスタからの PySpark の使用。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
設定
プロジェクトの変数を作成します。
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Cloud Storage バケットを作成します。Cloud Storage バケット名は、グローバルに一意である必要があります。
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
サポートされているロケーションで Pub/Sub Lite トピックとサブスクリプションを作成します。Pub/Sub Lite の予約を使用する場合は、トピックを作成するをご覧ください。
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Dataproc クラスタを作成します。
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: Pub/Sub Lite トピックとサブスクリプションが存在する、サポートされている Dataproc のリージョン。--image-version
: クラスタにインストールされている Apache Spark のバージョンを決定するクラスタのイメージ バージョン。現在、Pub/Sub Lite Spark コネクタは Apache Spark 3.x.x. をサポートしているため、2.x.x イメージ リリース バージョンを選択します。--scopes
: 同じプロジェクトの Google Cloud サービスへの API アクセスを有効にします。--enable-component-gateway
: Apache Spark ウェブ UI へのアクセスを有効にします。--bucket
: クラスタジョブの依存関係、ドライバ出力、クラスタ構成ファイルの保存に使用するステージング Cloud Storage バケット。
クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
Pub/Sub Lite への書き込み
次の例では、以下を行います。
spark.sql.Row
としてフォーマットされた連続する数値とタイムスタンプを生成するレートソースを作成する- Pub/Sub Lite Spark Connector の
writeStream
API によって、必要なテーブル スキーマに合わせてデータを変換する - 既存の Pub/Sub Lite トピックにデータを書き込む
書き込みジョブを Dataproc に送信するには、以下を行います。
Console
- PySpark スクリプトを Cloud Storage バケットにアップロードします。
- Cloud Storage コンソール に移動します。
- バケットを選択します。
- [ファイルをアップロード] を使用して、使用したい PySpark スクリプトをアップロードします。
- Dataproc クラスタにジョブを送信します。
- Dataproc コンソールに移動します。
- ジョブに移動します。
- [ジョブを送信] をクリックします。
- ジョブの詳細を入力します。
- [クラスタ] で、クラスタを選択します。
- [ジョブ] で、ジョブ ID に名前を付けます。
- [ジョブタイプ] で、PySpark を選択します。
- [メインの Python ファイル] の場合は、
gs://
で始まるアップロード済みの PySpark スクリプトの gcloud storage URI を指定します。 - [Jar ファイル] で、Maven から最新の Spark コネクタ バージョンを選択し、ダウンロード オプションに依存関係がある jar を探してそのリンクをコピーします。
- 引数については、GitHub から完全な PySpark スクリプトを使用する場合は、
--project_number=
PROJECT_NUMBER、--location=
PUBSUBLITE_LOCATION、--topic_id=
TOPIC_ID を入力します。To-Do が完了した上記の PySpark スクリプトをコピーする場合は、空白のままにします。 - [プロパティ] で、キー
spark.master
と値yarn
を入力します。 - [送信] をクリックします。
gcloud
gcloud dataproc jobs submit pyspark コマンドを使用して、ジョブを Dataproc に送信します。
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: 事前に選択されている Dataproc のリージョン。--cluster
: Dataproc のクラスタ名。--jars
: Cloud Storage の公開バケット内の依存関係を持つ Pub/Sub Lite Spark コネクタの uber jar。Maven の依存関係を含む uber jar をダウンロードするには、こちらのリンクにアクセスしてください。--driver-log-levels
: ルートレベルでロギングレベルを INFO に設定します。--properties
: Spark マスターに YARN リソース マネージャーを使用します。--
: スクリプトで必要な引数を指定します。
writeStream
オペレーションが成功すると、以下のようなログメッセージがローカルおよび Google Cloud コンソールのジョブの詳細ページに表示されます。
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Pub/Sub Lite からの読み取り
次の例では、readStream
API を使用して既存の Pub/Sub Lite サブスクリプションからメッセージを読み取ります。コネクタは、spark.sql.Row
という形式の固定テーブル スキーマに準拠するメッセージを出力します。
読み取りジョブを Dataproc に送信するには、以下を行います。
Console
- PySpark スクリプトを Cloud Storage バケットにアップロードします。
- Cloud Storage コンソール に移動します。
- バケットを選択します。
- [ファイルをアップロード] を使用して、使用したい PySpark スクリプトをアップロードします。
- Dataproc クラスタにジョブを送信します。
- Dataproc コンソールに移動します。
- ジョブに移動します。
- [ジョブを送信] をクリックします。
- ジョブの詳細を入力します。
- [クラスタ] で、クラスタを選択します。
- [ジョブ] で、ジョブ ID に名前を付けます。
- [ジョブタイプ] で、PySpark を選択します。
- [メインの Python ファイル] の場合は、
gs://
で始まるアップロード済みの PySpark スクリプトの gcloud storage URI を指定します。 - [Jar ファイル] で、Maven から最新の Spark コネクタ バージョンを選択し、ダウンロード オプションに依存関係がある jar を探してそのリンクをコピーします。
- 引数については、GitHub から完全な PySpark スクリプトを使用する場合は、
--project_number=
PROJECT_NUMBER、--location=
PUBSUBLITE_LOCATION、--subscription_id=
SUBSCRIPTION_ID を入力します。To-Do が完了した上記の PySpark スクリプトをコピーする場合は、空白のままにします。 - [プロパティ] で、キー
spark.master
と値yarn
を入力します。 - [送信] をクリックします。
gcloud
gcloud dataproc jobs submit pyspark コマンドを再び使用して、ジョブを Dataproc に送信します。
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: 事前に選択されている Dataproc のリージョン。--cluster
: Dataproc のクラスタ名。--jars
: Cloud Storage の公開バケット内の依存関係を持つ Pub/Sub Lite Spark コネクタの uber jar。Maven の依存関係を含む uber jar をダウンロードするには、こちらのリンクにアクセスしてください。--driver-log-levels
: ルートレベルでロギングレベルを INFO に設定します。--properties
: Spark マスターに YARN リソース マネージャーを使用します。--
: スクリプトに必要な引数を指定します。
readStream
オペレーションが成功すると、以下のようなログメッセージがローカルおよび Google Cloud コンソールのジョブの詳細ページに表示されます。
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Pub/Sub Lite のメッセージを再生、削除する
Pub/Sub Lite Spark コネクタを使用して Pub/Sub Lite から読み取る場合、Apache Spark システムがパーティション内のオフセットを独自に追跡するため、シーク オペレーションは機能しません。回避策は、ワークフローをドレイン、シークして再起動することです。
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。
トピックとサブスクリプションを削除します。
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Dataproc クラスタを削除します。
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Cloud Storage バケットを削除します。
gcloud storage rm gs://$BUCKET
次のステップ
Java のワードカウントの例で Pub/Sub Lite Spark コネクタを確認する。
Google Cloud プロダクトの Spark コネクタ: BigQuery コネクタ、Bigtable コネクタ、Cloud Storage コネクタ。