Organiza tus páginas con colecciones
Guarda y categoriza el contenido según tus preferencias.
La plantilla de MQTT a Pub/Sub es una canalización de transmisión que lee mensajes de un tema de MQTT y los escribe en Pub/Sub.
Incluye los parámetros opcionales username y password en caso de que el servidor MQTT requiera la autenticación.
Si la canalización no recibe ningún mensaje del tema de MQTT durante más de 90 minutos, se produce un StackOverflowError.
Como solución alternativa, puedes cambiar la cantidad de trabajadores cada 90 minutos.
Para obtener más información acerca de cómo cambiar la cantidad de trabajadores sin detener tu trabajo,
consulta Actualización de la opción de trabajo en tránsito.
Requisitos de la canalización
El nombre del tema de salida de Pub/Sub debe existir.
La IP del host de MQTT debe existir y tener la configuración de red adecuada para que las máquinas de trabajador lleguen al host de MQTT.
El tema MQTT del que se extraen los datos debe tener un nombre.
Parámetros de la plantilla
Parámetros obligatorios
inputTopic: El nombre del tema MQTT del que se leen los datos. (Ejemplo: tema).
outputTopic: El nombre del tema de Pub/Sub de salida en el que se escriben los datos. (Ejemplo: projects/your-project-id/topics/your-topic-name).
username: El nombre de usuario para usar en la autenticación en el servidor de MQTT (opcional). (Ejemplo: nombredeusuariodemuestra).
password: la contraseña asociada con el nombre de usuario proporcionado (opcional). (Ejemplo: contraseñademuestra).
Parámetros opcionales
brokerServer: La IP o el host del servidor del agente de MQTT. (Por ejemplo: tcp://host:1883).
Ejecuta la plantilla
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza por el nombre de la región de Dataflow. Por ejemplo: us-central1.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza INPUT_TOPIC por el nombre del tema de entrada del servidor MQTT. Por ejemplo: testtopic.
Reemplaza MQTT_SERVER por las direcciones del servidor MQTT. Por ejemplo: tcp://10.128.0.62:1883
Reemplaza OUTPUT_TOPIC por el nombre del tema de salida de Pub/Sub. Por ejemplo: projects/myproject/topics/testoutput.
Reemplaza USERNAME por el nombre de usuario para el servidor MQTT. Por ejemplo: testuser.
Reemplaza PASSWORD por la contraseña que corresponde al nombre de usuario usado con el servidor MQTT.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza por el nombre de la región de Dataflow. Por ejemplo: us-central1.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza INPUT_TOPIC por el nombre del tema de entrada del servidor MQTT. Por ejemplo: testtopic.
Reemplaza MQTT_SERVER por las direcciones del servidor MQTT. Por ejemplo: tcp://10.128.0.62:1883
Reemplaza OUTPUT_TOPIC por el nombre del tema de salida de Pub/Sub. Por ejemplo: projects/myproject/topics/testoutput.
Reemplaza USERNAME por el nombre de usuario para el servidor MQTT. Por ejemplo: testuser.
Reemplaza PASSWORD por la contraseña que corresponde al nombre de usuario usado con el servidor MQTT.
Código fuente de la plantilla
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
/**
* Dataflow template which reads data from Mqtt Topic and writes it to Cloud PubSub.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mqtt-to-pubsub/README_Mqtt_to_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Mqtt_to_PubSub",
category = TemplateCategory.STREAMING,
displayName = "MQTT to Pubsub",
description =
"The MQTT to Pub/Sub template is a streaming pipeline that reads messages from an MQTT topic and writes them to Pub/Sub. "
+ "It includes the optional parameters <code>username</code> and <code>password</code> in case authentication is required by the MQTT server.",
optionsClass = MqttToPubsub.MqttToPubsubOptions.class,
flexContainerName = "mqtt-to-pubsub",
contactInformation = "https://cloud.google.com/support",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/mqtt-to-pubsub",
preview = true,
requirements = {
"The Pub/Sub output topic name must exist.",
"The MQTT host IP must exist and have the proper network configuration for worker machines to reach the MQTT host.",
"The MQTT topic that data is extracted from must have a name."
},
streaming = true,
supportsAtLeastOnce = true)
public class MqttToPubsub {
/**
* Runs a pipeline which reads data from Mqtt topic and writes it to Cloud PubSub.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
MqttToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(MqttToPubsubOptions.class);
run(options);
}
public static void validate(MqttToPubsubOptions options) {
if (options != null) {
if ((options.getUsername() != null && !options.getUsername().isEmpty())
&& (options.getPassword() == null || options.getPassword().isBlank())) {
throw new IllegalArgumentException(
"While username is provided, password is required for authentication");
}
}
}
public static PipelineResult run(MqttToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
MqttIO.Read mqttIo;
if (!options.getUsername().isEmpty() || !options.getPassword().isBlank()) {
mqttIo =
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
options.getBrokerServer(), options.getInputTopic())
.withUsername(options.getUsername())
.withPassword(options.getPassword()));
} else {
mqttIo =
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
options.getBrokerServer(), options.getInputTopic()));
}
return pipeline
.apply("ReadFromMqttTopic", mqttIo)
.apply(ParDo.of(new ByteToStringTransform()))
.apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()))
.getPipeline()
.run();
}
static class ByteToStringTransform extends DoFn<byte[], String> {
@ProcessElement
public void processElement(@Element byte[] word, OutputReceiver<String> out) {
out.output(new String(word, StandardCharsets.UTF_8));
}
}
/**
* The {@link MqttToPubsubOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface MqttToPubsubOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
groupName = "Source",
optional = true,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
description = "MQTT Broker IP",
helpText = "The MQTT broker server IP or host.",
example = "tcp://host:1883")
@Validation.Required
String getBrokerServer();
void setBrokerServer(String brokerServer);
@TemplateParameter.Text(
order = 2,
groupName = "Source",
optional = false,
regexes = {"[\\/a-zA-Z0-9._-]+"},
description = "MQTT topic(s) to read the input from",
helpText = "The name of the MQTT topic that data is read from.",
example = "topic")
@Validation.Required
String getInputTopic();
void setInputTopic(String inputTopics);
@TemplateParameter.PubsubTopic(
order = 3,
groupName = "Target",
description = "Output Pub/Sub topic",
helpText = "The name of the output Pub/Sub topic that data is written to.",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
@TemplateParameter.Text(
order = 4,
description = "MQTT Username",
helpText = "The username to use for authentication on the MQTT server.",
example = "sampleusername")
String getUsername();
void setUsername(String username);
@TemplateParameter.Password(
order = 5,
description = "MQTT Password",
helpText = "The password associated with the provided username.",
example = "samplepassword")
String getPassword();
void setPassword(String password);
}
}