使用 Dataflow 範本建立串流管道
本快速入門導覽課程說明如何使用 Google 提供的 Dataflow 範本建立串流管道。具體來說,本快速入門導覽課程會以 Pub/Sub 到 BigQuery 範本為例。
「Pub/Sub 到 BigQuery 範本」是一個串流管道,可從 Pub/Sub 主題讀取 JSON 格式的訊息,並寫入 BigQuery 資料表。
如要直接在 Google Cloud 控制台按照逐步指南操作,請按一下「Guide me」(逐步引導):
事前準備
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
- 建立 Cloud Storage bucket:
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- In the Set a default class section, select the following: Standard.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
- Click Create.
- 複製下列內容,因為後續章節會用到:
- Cloud Storage 值區名稱。
- 您的 Google Cloud 專案 ID。
如要找出這個 ID,請參閱「識別專案」。
如要完成本快速入門導覽中的步驟,使用者帳戶必須具備 Dataflow 管理員角色和服務帳戶使用者角色。Compute Engine 預設服務帳戶必須具備 Dataflow 工作者角色、Storage 物件管理員角色、Pub/Sub 編輯者角色、BigQuery 資料編輯者角色和檢視者角色。如要在 Google Cloud 控制台中新增必要角色,請按照下列步驟操作:
- 前往「IAM」IAM頁面,然後選取專案。
前往「IAM」頁面 - 在包含您使用者帳戶的資料列中,按一下 「Edit principal」(編輯主體)。按一下「新增其他角色」 ,然後新增下列角色:「Dataflow 管理員」和「服務帳戶使用者」。
- 按一下 [儲存]。
- 在包含「Compute Engine 預設服務帳戶」(PROJECT_NUMBER-compute@developer.gserviceaccount.com) 的資料列中,按一下 「編輯主體」。
- 按一下「新增其他角色」 ,然後新增下列角色:Dataflow 工作者、Storage 物件管理員、Pub/Sub 編輯者、BigQuery 資料編輯者、檢視者。
按一下 [儲存]。
如要進一步瞭解如何授予角色,請參閱「使用控制台授予 IAM 角色」。
- 前往「IAM」IAM頁面,然後選取專案。
- 根據預設,每個新專案一開始都會具備預設網路。如果專案的預設網路已停用或刪除,您必須在專案中建立網路,並為使用者帳戶指派Compute 網路使用者角色 (
roles/compute.networkUser
)。
建立 BigQuery 資料集與資料表
使用 Google Cloud 控制台,為 Pub/Sub 主題建立具有適當結構定義的 BigQuery 資料集和資料表。
在這個範例中,資料集的名稱為 taxirides
,資料表的名稱為 realtime
。如要建立這個資料集和資料表,請按照下列步驟操作:
- 前往「BigQuery」BigQuery頁面。
前往 BigQuery - 在「Explorer」面板中,找到要建立資料集的專案,然後依序點選旁邊的 「查看動作」和「建立資料集」。
- 在「建立資料集」面板中,按照下列步驟操作:
- 在「Dataset ID」(資料集 ID) 中輸入
taxirides
。 每個 Google Cloud 專案的資料集 ID 皆不得重複。 - 針對「Location type」(位置類型) 選取「Multi-region」(多區域),然後選取「US (multiple regions in United States)」(us (多個美國區域))。公開資料集儲存在
US
多地區位置。為簡單起見,請將資料集放在相同的位置。 - 保留其他預設設定,然後按一下「建立資料集」。
- 在
「Explorer」 面板中展開專案。 - 在
taxirides
資料集旁邊,依序按一下 「View actions」(查看動作) 和「Create table」(建立資料表)。 - 在「Create table」(建立資料表) 面板中按照下列步驟操作:
- 在「Source」(來源) 區段中,針對「Create table from」(使用下列資料建立資料表),選取「Empty table」(空白資料表)。
- 在「Destination」(目的地) 區段中,在「Table」(資料表) 輸入
realtime
。 - 在「Schema」(結構定義) 區段中,按一下「Edit as text」(以文字形式編輯) 切換鈕,並在輸入框中貼上以下的結構定義:
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- 在「Partition and cluster settings」(分區與叢集設定) 區段的「Partitioning」(分區) 中,選取「Timestamp」(時間戳記) 欄位。
- 保留其他預設設定,然後按一下「建立資料表」。
執行管道
使用 Google 提供的「Pub/Sub 到 BigQuery」範本執行串流管道。管道會從輸入主題取得輸入資料。
- 前往 Dataflow 的「Jobs」(工作) 頁面:
前往「Jobs」(工作) - 按一下
「Create job from template」(利用範本建立工作) 。 - 在 Dataflow 工作的「Job name」(工作名稱) 中輸入
taxi-data
。 - 在「Dataflow template」(Dataflow 範本) 欄位中,選取 「Pub/Sub to BigQuery」(Pub/Sub 到 BigQuery) 範本。
- 在「BigQuery output table」(BigQuery 輸出資料表) 中輸入下列資訊:
PROJECT_ID:taxirides.realtime
將
PROJECT_ID
改為您建立 BigQuery 資料集的專案 ID。 - 在「Optional source parameters」(選用來源參數) 專區的「Input Pub/Sub topic」(輸入 Pub/Sub 主題) 部分,按一下「Enter topic manually」(手動輸入主題)。
- 在「Topic name」(主題名稱) 對話方塊中輸入以下內容,然後按一下「Save」(儲存):
projects/pubsub-public-data/topics/taxirides-realtime
這個公開的 Pub/Sub 主題是以紐約市計程車暨禮車管理局的公開資料集為基礎,以下是這個主題的訊息範例,格式為 JSON:
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- 在「Temp location」(臨時位置) 中輸入下列內容:
gs://BUCKET_NAME/temp/
請將
BUCKET_NAME
改成您的 Cloud Storage 值區名稱。temp
資料夾會儲存暫存檔案,例如暫存管道工作。 - 如果專案沒有預設網路,請輸入「網路」和「子網路」。詳情請參閱指定網路和子網路。
- 按一下「Run Job」(執行工作)。
查看結果
如要查看寫入realtime
資料表的資料,請按照下列步驟操作:
前往「BigQuery」頁面
按一下「撰寫新查詢」
。系統會隨即開啟新的「Editor」(編輯器) 分頁。SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
將
PROJECT_ID
替換為您建立 BigQuery 資料集的專案 ID。最多可能需要五分鐘,資料才會顯示於資料表。按一下「執行」。
查詢會傳回過去 24 小時內新增至資料表的資料列。您也可以使用標準 SQL 執行查詢。
清除所用資源
如要避免系統向您的 Google Cloud 帳戶收取本頁所用資源的費用,請按照下列步驟操作。
刪除專案
如要避免付費,最簡單的方法就是刪除您為快速入門導覽課程建立的 Google Cloud 專案。- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
刪除個別資源
如要保留您在本快速入門中使用的 Google Cloud 專案,請刪除個別資源:
- 前往 Dataflow 的「Jobs」(工作) 頁面:
前往「Jobs」(工作) - 從工作清單中選取串流工作。
- 按一下導覽區中的「停止」。
- 在「Stop job」(停止工作) 對話方塊中,取消或排除管道,然後按一下「Stop job」(停止工作)。
- 前往「BigQuery」頁面
前往 BigQuery - 在「Explorer」面板中展開專案。
- 在要刪除的資料集旁邊,依序按一下 「View actions」(查看動作) 和「Open」(開啟)。
- 在詳細資料面板中,按一下「刪除資料集」,然後按照指示操作。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.