MQTT to Pub/Sub 範本

「MQTT 到 Pub/Sub」範本是一個串流管道,可從 MQTT 主題讀取訊息並寫入 Pub/Sub。 如果 MQTT 伺服器需要驗證,則包含選用參數 usernamepassword

如果管道超過 90 分鐘未收到 MQTT 主題的任何訊息,就會發生 StackOverflowError。 如要解決這個問題,可以每 90 分鐘變更一次工作站數量。 如要進一步瞭解如何在不停止工作的情況下變更工作站數量,請參閱「執行中工作選項更新」。

管道相關規定

  • Pub/Sub 輸出主題名稱必須存在。
  • MQTT 主機 IP 必須存在,且網路設定必須正確,工作站機器才能連上 MQTT 主機。
  • 擷取資料的 MQTT 主題必須有名稱。

範本參數

必要參數

  • inputTopic:要從中讀取資料的 MQTT 主題名稱。例如:topic
  • outputTopic:要寫入資料的輸出 Pub/Sub 主題名稱。例如:projects/your-project-id/topics/your-topic-name
  • 使用者名稱:用於 MQTT 伺服器驗證的使用者名稱。例如:sampleusername
  • password:與所提供使用者名稱相關聯的密碼。例如:samplepassword

選用參數

  • brokerServer:MQTT 代理程式伺服器 IP 或主機。例如:tcp://host:1883

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 MQTT to Pub/Sub template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Mqtt_to_PubSub \
    --parameters \
brokerServer=MQTT_SERVER,\
inputTopic=INPUT_TOPIC,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

您必須將這個例子中的下列這些值替換掉:

  • YOUR_PROJECT_ID 替換為您的專案 ID。
  • 請將 替換為 Dataflow 區域名稱。例如:us-central1
  • 請將 JOB_NAME 改成您選擇的工作名稱。工作名稱必須符合規則運算式 [a-z]([-a-z0-9]{0,38}[a-z0-9])? 才有效。
  • INPUT_TOPIC 替換為 MQTT 伺服器輸入主題的名稱。例如:testtopic
  • MQTT_SERVER 替換為 MQTT 伺服器位址。例如:tcp://10.128.0.62:1883
  • 請將 OUTPUT_TOPIC 改成 Pub/Sub 輸出主題名稱。例如:projects/myproject/topics/testoutput
  • USERNAME 改為 MQTT 伺服器的使用者名稱。例如:testuser
  • PASSWORD 替換為與 MQTT 伺服器搭配使用的使用者名稱對應密碼。

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "brokerServer": "MQTT_SERVER",
          "inputTopic": "INPUT_TOPIC",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Mqtt_to_PubSub",
   }
}
  

您必須將這個例子中的下列這些值替換掉:

  • YOUR_PROJECT_ID 替換為您的專案 ID。
  • 請將 替換為 Dataflow 區域名稱。例如:us-central1
  • 請將 JOB_NAME 改成您選擇的工作名稱。工作名稱必須符合規則運算式 [a-z]([-a-z0-9]{0,38}[a-z0-9])? 才有效。
  • INPUT_TOPIC 替換為 MQTT 伺服器輸入主題的名稱。例如:testtopic
  • MQTT_SERVER 替換為 MQTT 伺服器位址。例如:tcp://10.128.0.62:1883
  • 請將 OUTPUT_TOPIC 改成 Pub/Sub 輸出主題名稱。例如:projects/myproject/topics/testoutput
  • USERNAME 改為 MQTT 伺服器的使用者名稱。例如:testuser
  • PASSWORD 替換為與 MQTT 伺服器搭配使用的使用者名稱對應密碼。

後續步驟