Dataflow テンプレートを使用してストリーミング パイプラインを作成する
このドキュメントでは、Google 提供の Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、クイックスタートで例として Pub/Sub to BigQuery テンプレートを使用します。
Pub/Sub to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、BigQuery テーブルに書き込むことができるストリーミング パイプラインです。
このタスクの手順をガイドに沿って Google Cloud コンソールで直接行う場合は、「ガイドを表示」をクリックしてください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
- Cloud Storage バケットを作成します。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select the following: Standard.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
- 次のものをコピーします。これらは以後のセクションで使用されます。
- Cloud Storage バケット名。
- Google Cloud プロジェクト ID。
ID を調べる方法については、プロジェクトの識別をご覧ください。
このクイックスタートの手順を最後まで行うには、ユーザー アカウントに Dataflow 管理者ロールとサービス アカウント ユーザー ロールが必要です。Compute Engine のデフォルトのサービス アカウントには、Dataflow ワーカーロール、Storage オブジェクト管理者ロール、Pub/Sub 編集者ロール、BigQuery データ編集者ロール、閲覧者ロールが必要です。Google Cloud コンソールで必要なロールを追加する手順は次のとおりです。
- [IAM] ページに移動して、プロジェクトを選択します。
[IAM] に移動 - ユーザー アカウントを含む行で、 [プリンシパルを編集] をクリックします。[ 別のロールを追加] をクリックし、[Dataflow 管理者] と [サービス アカウント ユーザー] のロールを追加します。
- [保存] をクリックします。
- Compute Engine のデフォルトのサービス アカウント(PROJECT_NUMBER-compute@developer.gserviceaccount.com)を含む行で、 [プリンシパルを編集します] をクリックします。
- [ 別のロールを追加] をクリックし、Dataflow ワーカー、ストレージ オブジェクト管理者、Pub/Sub 編集者、BigQuery データ編集者、閲覧者のロールを追加します。
[保存] をクリックします。
ロール付与の詳細については、コンソールを使用して IAM ロールを付与するをご覧ください。
- [IAM] ページに移動して、プロジェクトを選択します。
- デフォルトでは、新しいプロジェクトはデフォルト ネットワークで開始されます。プロジェクトのデフォルト ネットワークが無効または削除されている場合、Compute ネットワーク ユーザーの役割(
roles/compute.networkUser
)を含むユーザー アカウント用のプロジェクト内にネットワークが必要です。
BigQuery データセットとテーブルを作成する
Google Cloud Console で Pub/Sub トピックに適したスキーマを使用して、BigQuery のデータセットとテーブルを作成します。
この例で、データセットの名前は taxirides
、テーブルの名前は realtime
です。このデータセットとテーブルを作成するには、以下の操作を行います。
- [BigQuery] ページに移動します。
[BigQuery] に移動 - [エクスプローラ] パネルで、データセットを作成するプロジェクトの横にある [ アクションを表示] をクリックしてから、[データセットを作成] をクリックします。
- [データセットを作成] パネルで、次の操作を行います。
- [データセット ID] に
taxirides
を入力します。 データセット ID は、Google Cloud プロジェクトごとに一意です。 - [ロケーション タイプ] で [マルチリージョン] を選択してから、[US(米国の複数のリージョン)] を選択します。一般公開データセットは
US
マルチリージョン ロケーションに保存されています。わかりやすくするため、データセットを同じロケーションに配置します。 - その他のデフォルト設定はそのままにして、[データセットを作成] をクリックします。
- [
エクスプローラ ] パネルで、プロジェクトを開きます。 taxirides
データセットの隣にある 「アクションを表示」をクリックし、[テーブルを作成] をクリックします。- [テーブルを作成] パネルで、次の操作を行います。
- [ソース] セクションの [テーブルの作成元] で [空のテーブル] を選択します。
- [送信先] セクションの [テーブル] に「
realtime
」と入力します。 - [スキーマ] セクションで [テキストとして編集] をクリックし、次のスキーマ定義をボックスに貼り付けます。
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- [パーティションとクラスタの設定] セクションの [パーティショニング] で、[タイムスタンプ] フィールドを選択します。
- その他のデフォルト設定はそのままにして、[テーブルを作成] をクリックします。
パイプラインを実行する
Google が提供する Pub/Sub to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。パイプラインは入力トピックから受信データを取得します。
- Dataflow の [ジョブ] ページに移動します。
[ジョブ] に移動 - [
テンプレートからジョブを作成 ] をクリックします。 - Dataflow ジョブの [ジョブ名] として「
taxi-data
」と入力します。 - [Dataflow テンプレート] で、[Pub/Sub to BigQuery] テンプレートを選択します。
- [BigQuery output table] に、次のテキストを入力します。
PROJECT_ID:taxirides.realtime
PROJECT_ID
は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。 - [オプションのソースパラメータ] セクションの [Pub/Sub トピックを入力] で、[トピックを手動で入力] をクリックします。
- ダイアログで、[トピック名] に次のように入力し、[保存] をクリックします。
projects/pubsub-public-data/topics/taxirides-realtime
一般公開されている Pub/Sub トピックは、NYC Taxi & Limousine Commission のオープン データセットに基づいています。このトピックの JSON 形式のサンプル メッセージを次に示します。
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- [一時的な保存場所] に次のように入力します。
gs://BUCKET_NAME/temp/
BUCKET_NAME
を Cloud Storage バケットの名前に置き換えます。temp
フォルダには、ステージング済みのパイプライン ジョブなどの一時ファイルが保存されます。 - プロジェクトにデフォルト ネットワークがない場合は、[ネットワーク] と [サブネットワーク] を入力します。詳細については、ネットワークとサブネットワークの指定をご覧ください。
- [ジョブを実行] をクリックします。
結果を表示する
realtime
テーブルに書き込まれたデータを表示する方法は次のとおりです。
[BigQuery] ページに移動します。
[
クエリを新規作成] をクリックします。新しいエディタタブが開きます。SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
PROJECT_ID
は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。テーブルにデータが表示されるまで、最大で 5 分かかることがあります。[実行] をクリックします。
クエリは、過去 24 時間以内にテーブルに追加された行を返します。標準 SQL を使用してクエリを実行することもできます。
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を実施します。
プロジェクトの削除
課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
個々のリソースの削除
このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。
- Dataflow の [ジョブ] ページに移動します。
[ジョブ] に移動 - ジョブリストからストリーミング ジョブを選択します。
- ナビゲーションで、[停止] をクリックします。
- [ジョブの停止] ダイアログで、パイプラインを [キャンセル] または [ドレイン] し、[ジョブの停止] をクリックします。
- [BigQuery] ページに移動します。
[BigQuery] に移動 - [エクスプローラ] パネルで、プロジェクトを展開します。
- 削除するデータセットの横にある [ アクションを表示] をクリックし、[開く] をクリックします。
- 詳細パネルで [データセットを削除] をクリックして、指示に従って操作します。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.