這個範本可以建立串流管道,持續輪詢新上傳到 Cloud Storage 的文字檔案,並逐行讀取每個檔案,然後將字串發布至 Pub/Sub 主題。此外,這個範本還能以內含 JSON 記錄的換行符號分隔檔案或 CSV 檔案格式,將記錄發布到 Pub/Sub 主題進行即時處理;您也可以針對 Pub/Sub 重送資料。
由於管道使用「Watch」轉換 (不支援排空的「SplittableDoFn」),因此會無限期持續執行,直到透過「cancel」而非「drain」手動終止。
目前的輪詢間隔固定為 10 秒一次。這個範本不會對個別記錄設定任何時間戳記,因此在執行期間,事件時間會等於發布時間。假如您的管道需要準確的時間才能處理作業,則請勿使用這個管道。
管道相關規定
- 輸入檔案必須是換行符號分隔的 JSON 或 CSV 格式。在來源檔案中跨多行的記錄可能會導致下游出現問題,因為檔案中的每一行都會以一條訊息的形式發布至 Pub/Sub。
- Pub/Sub 主題必須在執行前就已存在。
- 管道會無限期持續執行,直到手動終止。
範本參數
必要參數
- inputFilePattern:要讀取的輸入檔案模式,例如:
gs://bucket-name/files/*.json
。 - outputTopic:要寫入的 Pub/Sub 輸入主題,名稱的格式必須為
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
。例如:projects/your-project-id/topics/your-topic-name
。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Text Files on Cloud Storage to Pub/Sub (Stream) template。
- 在提供的參數欄位中輸入參數值。
- 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
更改下列內容:
JOB_NAME
: 您選擇的不重複工作名稱REGION_NAME
: 您要部署 Dataflow 工作的地區,例如us-central1
STAGING_LOCATION
:用於暫存本機檔案的位置 (例如gs://your-bucket/staging
)TOPIC_NAME
:您的 Pub/Sub 主題名稱BUCKET_NAME
:Cloud Storage bucket 的名稱FILE_PATTERN
:要從 Cloud Storage 值區讀取的檔案模式 glob (例如path/*.csv
)
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱LOCATION
: 您要部署 Dataflow 工作的地區,例如us-central1
STAGING_LOCATION
:用於暫存本機檔案的位置 (例如gs://your-bucket/staging
)TOPIC_NAME
:您的 Pub/Sub 主題名稱BUCKET_NAME
:Cloud Storage bucket 的名稱FILE_PATTERN
:要從 Cloud Storage 值區讀取的檔案模式 glob (例如path/*.csv
)
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。