Cloud Storage 中的文字檔案到 Pub/Sub (串流) 範本

這個範本可以建立串流管道,持續輪詢新上傳到 Cloud Storage 的文字檔案,並逐行讀取每個檔案,然後將字串發布至 Pub/Sub 主題。此外,這個範本還能以內含 JSON 記錄的換行符號分隔檔案或 CSV 檔案格式,將記錄發布到 Pub/Sub 主題進行即時處理;您也可以針對 Pub/Sub 重送資料。

由於管道使用「Watch」轉換 (不支援排空的「SplittableDoFn」),因此會無限期持續執行,直到透過「cancel」而非「drain」手動終止。

目前的輪詢間隔固定為 10 秒一次。這個範本不會對個別記錄設定任何時間戳記,因此在執行期間,事件時間會等於發布時間。假如您的管道需要準確的時間才能處理作業,則請勿使用這個管道。

管道相關規定

  • 輸入檔案必須是換行符號分隔的 JSON 或 CSV 格式。在來源檔案中跨多行的記錄可能會導致下游出現問題,因為檔案中的每一行都會以一條訊息的形式發布至 Pub/Sub。
  • Pub/Sub 主題必須在執行前就已存在。
  • 管道會無限期持續執行,直到手動終止。

範本參數

必要參數

  • inputFilePattern:要讀取的輸入檔案模式,例如:gs://bucket-name/files/*.json
  • outputTopic:要寫入的 Pub/Sub 輸入主題,名稱的格式必須為 projects/<PROJECT_ID>/topics/<TOPIC_NAME>。例如:projects/your-project-id/topics/your-topic-name

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Text Files on Cloud Storage to Pub/Sub (Stream) template。
  6. 在提供的參數欄位中輸入參數值。
  7. 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」
  8. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

更改下列內容:

  • JOB_NAME: 您選擇的不重複工作名稱
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
  • TOPIC_NAME:您的 Pub/Sub 主題名稱
  • BUCKET_NAME:Cloud Storage bucket 的名稱
  • FILE_PATTERN:要從 Cloud Storage 值區讀取的檔案模式 glob (例如 path/*.csv)

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
  • TOPIC_NAME:您的 Pub/Sub 主題名稱
  • BUCKET_NAME:Cloud Storage bucket 的名稱
  • FILE_PATTERN:要從 Cloud Storage 值區讀取的檔案模式 glob (例如 path/*.csv)

後續步驟