「MQTT 到 Pub/Sub」範本是一個串流管道,可從 MQTT 主題讀取訊息並寫入 Pub/Sub。
如果 MQTT 伺服器需要驗證,則包含選用參數 username
和 password
。
如果管道超過 90 分鐘未收到 MQTT 主題的任何訊息,就會發生 StackOverflowError
。
如要解決這個問題,可以每 90 分鐘變更一次工作站數量。
如要進一步瞭解如何在不停止工作的情況下變更工作站數量,請參閱「執行中工作選項更新」。
管道相關規定
- Pub/Sub 輸出主題名稱必須存在。
- MQTT 主機 IP 必須存在,且網路設定必須正確,工作站機器才能連上 MQTT 主機。
- 擷取資料的 MQTT 主題必須有名稱。
範本參數
必要參數
- inputTopic:要從中讀取資料的 MQTT 主題名稱。例如:
topic
。 - outputTopic:要寫入資料的輸出 Pub/Sub 主題名稱。例如:
projects/your-project-id/topics/your-topic-name
。 - 使用者名稱:用於 MQTT 伺服器驗證的使用者名稱。例如:
sampleusername
。 - password:與所提供使用者名稱相關聯的密碼。例如:
samplepassword
。
選用參數
- brokerServer:MQTT 代理程式伺服器 IP 或主機。例如:
tcp://host:1883
。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 MQTT to Pub/Sub template。
- 在提供的參數欄位中輸入參數值。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Mqtt_to_PubSub \ --parameters \ brokerServer=MQTT_SERVER,\ inputTopic=INPUT_TOPIC,\ outputTopic=OUTPUT_TOPIC,\ username=USERNAME,\ password=PASSWORD
您必須將這個例子中的下列這些值替換掉:
- 將 YOUR_PROJECT_ID 替換為您的專案 ID。
- 請將 替換為 Dataflow 區域名稱。例如:
us-central1
。 - 請將 JOB_NAME 改成您選擇的工作名稱。工作名稱必須符合規則運算式
[a-z]([-a-z0-9]{0,38}[a-z0-9])?
才有效。 - 將 INPUT_TOPIC 替換為 MQTT 伺服器輸入主題的名稱。例如:
testtopic
。 - 將 MQTT_SERVER 替換為 MQTT 伺服器位址。例如:
tcp://10.128.0.62:1883
- 請將 OUTPUT_TOPIC 改成 Pub/Sub 輸出主題名稱。例如:
projects/myproject/topics/testoutput
。 - 將 USERNAME 改為 MQTT 伺服器的使用者名稱。例如:
testuser
。 - 將 PASSWORD 替換為與 MQTT 伺服器搭配使用的使用者名稱對應密碼。
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "brokerServer": "MQTT_SERVER", "inputTopic": "INPUT_TOPIC", "outputTopic": "OUTPUT_TOPIC", "username": "USERNAME", "password": "PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Mqtt_to_PubSub", } }
您必須將這個例子中的下列這些值替換掉:
- 將 YOUR_PROJECT_ID 替換為您的專案 ID。
- 請將 替換為 Dataflow 區域名稱。例如:
us-central1
。 - 請將 JOB_NAME 改成您選擇的工作名稱。工作名稱必須符合規則運算式
[a-z]([-a-z0-9]{0,38}[a-z0-9])?
才有效。 - 將 INPUT_TOPIC 替換為 MQTT 伺服器輸入主題的名稱。例如:
testtopic
。 - 將 MQTT_SERVER 替換為 MQTT 伺服器位址。例如:
tcp://10.128.0.62:1883
- 請將 OUTPUT_TOPIC 改成 Pub/Sub 輸出主題名稱。例如:
projects/myproject/topics/testoutput
。 - 將 USERNAME 改為 MQTT 伺服器的使用者名稱。例如:
testuser
。 - 將 PASSWORD 替換為與 MQTT 伺服器搭配使用的使用者名稱對應密碼。
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。