「Pub/Sub Avro 到 BigQuery」範本是一種串流管道,可將 Avro 資料從 Pub/Sub 訂閱項目擷取至 BigQuery 資料表。寫入 BigQuery 資料表時發生的所有錯誤都會串流至未處理的 Pub/Sub 主題。
針對這個情境執行 Dataflow 管道之前,請考慮Pub/Sub BigQuery 訂閱項目搭配 UDF 是否符合您的需求。
管道相關規定
- 輸入 Pub/Sub 訂閱項目必須已存在。
- Avro 記錄的結構定義檔案必須存在於 Cloud Storage。
- 未處理的 Pub/Sub 主題必須存在。
- 輸出 BigQuery 資料集必須已存在。
範本參數
必要參數
- schemaPath:Avro 結構定義檔案的 Cloud Storage 位置。例如:
gs://path/to/my/schema.avsc
。 - inputSubscription:要讀取的 Pub/Sub 輸入訂閱項目。例如:
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>
。 - outputTableSpec:用於寫入輸出內容的 BigQuery 輸出資料表位置。例如
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
。視指定的createDisposition
而定,系統可能會使用使用者提供的 Avro 結構定義,自動建立輸出資料表。 - outputTopic:用於未處理記錄的 Pub/Sub 主題。例如:
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
。
選用參數
- useStorageWriteApiAtLeastOnce:使用 Storage Write API 時,指定寫入語意。如要使用「至少一次」語意 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),請將這個參數設為 true。如要使用「僅限一次」語意,請將參數設為
false
。只有在useStorageWriteApi
為true
時,這項參數才會生效。預設值為false
。 - writeDisposition:BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload) 值。例如
WRITE_APPEND
、WRITE_EMPTY
或WRITE_TRUNCATE
。預設值為WRITE_APPEND
。 - createDisposition:BigQuery CreateDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)。例如
CREATE_IF_NEEDED
和CREATE_NEVER
。預設值為CREATE_IF_NEEDED
。 - useStorageWriteApi:如果為 true,管道會使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。預設值為
false
。詳情請參閱「使用 Storage Write API」(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。 - numStorageWriteApiStreams:使用 Storage Write API 時,指定寫入串流的數量。如果
useStorageWriteApi
為true
,且useStorageWriteApiAtLeastOnce
為false
,則必須設定這個參數。預設值為 0。 - storageWriteApiTriggeringFrequencySec:使用 Storage Write API 時,指定觸發頻率 (以秒為單位)。如果
useStorageWriteApi
為true
,且useStorageWriteApiAtLeastOnce
為false
,則必須設定這個參數。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Pub/Sub Avro to BigQuery template。
- 在提供的參數欄位中輸入參數值。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
更改下列內容:
JOB_NAME
: 您選擇的不重複工作名稱REGION_NAME
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
:Avro 結構定義檔案的 Cloud Storage 路徑 (例如gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
:Pub/Sub 輸入訂閱項目名稱BIGQUERY_TABLE
:BigQuery 輸出資料表名稱DEADLETTER_TOPIC
:用於未處理佇列的 Pub/Sub 主題
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
更改下列內容:
JOB_NAME
: 您選擇的不重複工作名稱LOCATION
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
:Avro 結構定義檔案的 Cloud Storage 路徑 (例如gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
:Pub/Sub 輸入訂閱項目名稱BIGQUERY_TABLE
:BigQuery 輸出資料表名稱DEADLETTER_TOPIC
:用於未處理佇列的 Pub/Sub 主題
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。