En esta página se describen las prácticas recomendadas que debe seguir al desarrollar sus canalizaciones de Dataflow. Si sigues estas prácticas recomendadas, obtendrás las siguientes ventajas:
- Mejorar la observabilidad y el rendimiento de las canalizaciones
- Aumentar la productividad de los desarrolladores
- Mejorar la capacidad de prueba de los flujos de procesamiento
Los ejemplos de código de Apache Beam de esta página usan Java, pero el contenido se aplica a los SDKs de Apache Beam para Java, Python y Go.
Preguntas útiles
Al diseñar tu canalización, ten en cuenta las siguientes preguntas:
- ¿Dónde se almacenan los datos de entrada de tu canalización? ¿Cuántos conjuntos de datos de entrada tienes?
- ¿Qué aspecto tienen tus datos?
- ¿Qué quieres hacer con tus datos?
- ¿Dónde deben ir los datos de salida de la canalización?
- ¿Tu tarea de Dataflow usa Assured Workloads?
Usar plantillas
Para acelerar el desarrollo de flujos de procesamiento, en lugar de crear un flujo de procesamiento escribiendo código de Apache Beam, utiliza una plantilla de Dataflow siempre que sea posible. Las plantillas tienen las siguientes ventajas:
- Las plantillas se pueden reutilizar.
- Las plantillas te permiten personalizar cada trabajo cambiando parámetros específicos de la canalización.
- Cualquier usuario al que concedas permisos podrá usar la plantilla para implementar la canalización. Por ejemplo, un desarrollador puede crear una tarea a partir de una plantilla y un científico de datos de la organización puede implementar esa plantilla más adelante.
Puede usar una plantilla proporcionada por Google o crear una. Algunas plantillas proporcionadas por Google te permiten añadir lógica personalizada como paso de una canalización. Por ejemplo, la plantilla de Pub/Sub a BigQuery proporciona un parámetro para ejecutar una función de JavaScript definida por el usuario (UDF) que se almacena en Cloud Storage.
Como las plantillas proporcionadas por Google son de código abierto y están sujetas a la licencia Apache 2.0, puedes usarlas como base para crear nuevas canalizaciones. Las plantillas también son útiles como ejemplos de código. Consulta el código de la plantilla en el repositorio de GitHub.
Assured Workloads
Assured Workloads ayuda a aplicar los requisitos de seguridad y cumplimiento a los clientes de Google Cloud Platform. Por ejemplo, Regiones de la UE y asistencia con controles de soberanía ayuda a aplicar las garantías de residencia y soberanía de los datos para los clientes de la UE. Para ofrecer estas funciones, algunas funciones de Dataflow están restringidas o limitadas. Si usas Assured Workloads con Dataflow, todos los recursos a los que acceda tu canalización deben estar ubicados en el proyecto o la carpeta de Assured Workloads de tu organización. Estos son algunos de los recursos que ponemos a tu disposición:
- Segmentos de Cloud Storage
- Conjuntos de datos de BigQuery
- Temas y suscripciones de Pub/Sub
- Conjuntos de datos de Firestore
- Conectores de E/S
En Dataflow, en las tareas de streaming creadas después del 7 de marzo del 2024, todos los datos de usuario se encriptan con CMEK.
En los trabajos de streaming creados antes del 7 de marzo del 2024, las claves de datos que se usan en operaciones basadas en claves, como las de ventana, agrupación y unión, no están protegidas con el cifrado CMEK. Para habilitar este cifrado en tus tareas, desvía o cancela la tarea y, a continuación, reiníciala. Para obtener más información, consulta Cifrado de artefactos de estado de la canalización.
Compartir datos entre flujos
No hay ningún mecanismo de comunicación entre flujos de Dataflow específico para compartir datos o contexto de procesamiento entre flujos. Puedes usar un almacenamiento duradero, como Cloud Storage, o una caché en memoria, como App Engine, para compartir datos entre instancias de la canalización.
Programar tareas
Puedes automatizar la ejecución de la canalización de las siguientes formas:
- Usa Cloud Scheduler.
- Usa el operador Dataflow de Apache Airflow, uno de los varios operadores de Google Cloud Platform en un flujo de trabajo de Cloud Composer.
- Ejecuta procesos de trabajos personalizados (cron) en Compute Engine.
Prácticas recomendadas para escribir código de la canalización
En las siguientes secciones se indican las prácticas recomendadas que debe seguir al crear canalizaciones escribiendo código de Apache Beam.
Estructurar el código de Apache Beam
Para crear flujos de procesamiento, es habitual usar la transformación de Apache Beam de procesamiento paralelo genérico
ParDo
.
Cuando aplicas una transformación ParDo
, proporcionas código en forma de objeto DoFn
. DoFn
es una clase del SDK de Apache Beam que define una función de procesamiento distribuido.
Puedes considerar tu código DoFn
como entidades pequeñas e independientes: puede haber muchas instancias ejecutándose en diferentes máquinas, cada una de ellas sin conocimiento de las demás. Por lo tanto, te recomendamos que crees funciones puras, que son ideales para la naturaleza paralela y distribuida de los elementos DoFn
.
Las funciones puras tienen las siguientes características:
- Las funciones puras no dependen de estados ocultos o externos.
- No tienen efectos secundarios observables.
- Son deterministas.
El modelo de función pura no es estrictamente rígido. Cuando tu código no depende de elementos que no garantiza el servicio Dataflow, la información de estado o los datos de inicialización externos pueden ser válidos para DoFn
y otros objetos de función.
Cuando estructures tus ParDo
transformaciones y crees tus elementos DoFn
, ten en cuenta las siguientes directrices:
- Cuando usas el procesamiento exactamente una vez, el servicio Dataflow garantiza que cada elemento de tu
PCollection
de entrada se procese exactamente una vez por una instancia deDoFn
. - El servicio Dataflow no garantiza cuántas veces se invoca un
DoFn
. - El servicio Dataflow no garantiza exactamente cómo se agrupan los elementos distribuidos. No garantiza qué elementos se procesan juntos, si es que se procesa alguno.
- El servicio Dataflow no garantiza el número exacto de instancias
DoFn
creadas a lo largo de un flujo de procesamiento. - El servicio Dataflow es tolerante a fallos y puede volver a intentar ejecutar tu código varias veces si los trabajadores tienen problemas.
- El servicio Dataflow puede crear copias de seguridad de tu código. Pueden producirse problemas con los efectos secundarios manuales, por ejemplo, si tu código depende de archivos temporales con nombres no únicos o los crea.
- El servicio Dataflow serializa el procesamiento de elementos por
DoFn
instancia. No es necesario que tu código sea estrictamente seguro para subprocesos, pero cualquier estado compartido entre varias instancias deDoFn
debe ser seguro para subprocesos.
Crear bibliotecas de transformaciones reutilizables
El modelo de programación de Apache Beam te permite reutilizar transformaciones. Si creas una biblioteca compartida de transformaciones comunes, puedes mejorar la reutilización, la capacidad de prueba y la propiedad del código por parte de diferentes equipos.
Echa un vistazo a los dos ejemplos de código Java siguientes, que leen eventos de pago. Si ambas canalizaciones realizan el mismo procesamiento, pueden usar las mismas transformaciones a través de una biblioteca compartida para los pasos de procesamiento restantes.
El primer ejemplo es de una fuente de Pub/Sub sin límites:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
// Initial read transform
PCollection<PaymentEvent> payments =
p.apply("Read from topic",
PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
.apply("Parse strings into payment events",
ParDo.of(new ParsePaymentEventFn()));
El segundo ejemplo procede de una fuente de base de datos relacional delimitada:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<PaymentEvent> payments =
p.apply(
"Read from database table",
JdbcIO.<PaymentEvent>read()
.withDataSourceConfiguration(...)
.withQuery(...)
.withRowMapper(new RowMapper<PaymentEvent>() {
...
}));
La forma de implementar las prácticas recomendadas de reutilización de código varía en función del lenguaje de programación y de la herramienta de compilación. Por ejemplo, si usas Maven, puedes separar el código de transformación en su propio módulo. Después, puedes incluir el módulo como submódulo en proyectos multimódulo más grandes para diferentes pipelines, como se muestra en el siguiente ejemplo de código:
// Reuse transforms across both pipelines
payments
.apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
.apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
...
Para obtener más información, consulta las siguientes páginas de documentación de Apache Beam:
- Requisitos para escribir código de usuario para las transformaciones de Apache Beam
- Guía de estilo de
PTransform
: guía de estilo para redactores de nuevas colecciones dePTransform
reutilizables
Usar colas de mensajes fallidos para gestionar errores
A veces, tu canalización no puede procesar elementos. Los problemas con los datos son una causa habitual. Por ejemplo, un elemento que contiene JSON con formato incorrecto puede provocar errores de análisis.
Aunque puedes detectar excepciones en el método DoFn.ProcessElement
, registrar el error y eliminar el elemento, este enfoque hace que se pierdan los datos e impide que se inspeccionen más adelante para gestionarlos manualmente o solucionar problemas.
En su lugar, usa un patrón llamado cola de mensajes fallidos (cola de mensajes no procesados).
Captura las excepciones en el método DoFn.ProcessElement
y registra los errores. En lugar de eliminar el elemento fallido, usa salidas de ramificación para escribir los elementos fallidos en un objeto PCollection
independiente. Estos elementos se escriben en un receptor de datos para inspeccionarlos más adelante
y gestionarlos con una transformación independiente.
En el siguiente ejemplo de código Java se muestra cómo implementar el patrón de cola de mensajes fallidos.
TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* ... */;
PCollectionTuple outputTuple =
input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead-letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...
Usa Cloud Monitoring para aplicar diferentes políticas de monitorización y alertas a la cola de mensajes fallidos de tu canalización. Por ejemplo, puede visualizar el número y el tamaño de los elementos procesados por su transformación de mensajes fallidos y configurar alertas para que se activen si se cumplen determinadas condiciones de umbral.
Gestionar mutaciones de esquema
Puedes gestionar los datos que tengan esquemas inesperados pero válidos mediante un patrón de mensajes fallidos, que escribe los elementos fallidos en un objeto PCollection
independiente.
En algunos casos, es recomendable gestionar automáticamente los elementos que reflejan un esquema mutado como elementos válidos. Por ejemplo, si el esquema de un elemento refleja una mutación, como la adición de nuevos campos, puedes adaptar el esquema del receptor de datos para que se ajuste a las mutaciones.
La mutación automática de esquemas se basa en el enfoque de salida de ramificación que usa el patrón de mensajes fallidos. Sin embargo, en este caso, se activa una transformación que muta el esquema de destino cada vez que se encuentran esquemas aditivos. Para ver un ejemplo de este enfoque, consulta Cómo gestionar esquemas JSON mutables en una canalización de streaming con Square Enix en el Google Cloud blog.
Decidir cómo combinar conjuntos de datos
La combinación de conjuntos de datos es un caso práctico habitual de las canalizaciones de datos. Puedes usar entradas laterales o la transformación CoGroupByKey
para realizar combinaciones en tu flujo de trabajo.
Cada una tiene sus ventajas e inconvenientes.
Las entradas secundarias
proporcionan una forma flexible de resolver problemas habituales de procesamiento de datos, como el enriquecimiento de datos y las búsquedas por clave. A diferencia de los objetos PCollection
, las entradas laterales son mutables y se pueden determinar en el tiempo de ejecución. Por ejemplo, los valores de una entrada lateral pueden calcularse en otra rama de tu canalización o determinarse llamando a un servicio remoto.
Dataflow admite entradas secundarias conservando los datos en un almacenamiento persistente, de forma similar a un disco compartido. Con esta configuración, la entrada lateral completa está disponible para todos los trabajadores.
Sin embargo, los tamaños de las entradas secundarias pueden ser muy grandes y es posible que no quepan en la memoria de los trabajadores. Leer de una entrada lateral grande puede provocar problemas de rendimiento si los trabajadores necesitan leer constantemente del almacenamiento persistente.
La transformación CoGroupByKey
es una transformación principal de Apache Beam
que combina (aplana) varios objetos PCollection
y agrupa los elementos que tienen una clave común. A diferencia de una entrada auxiliar, que pone a disposición de cada trabajador todos los datos de la entrada auxiliar, CoGroupByKey
realiza una operación de aleatorización (agrupación) para distribuir los datos entre los trabajadores. Por lo tanto, CoGroupByKey
es ideal cuando los objetos PCollection
que quieres combinar son muy grandes y no caben en la memoria del trabajador.
Sigue estas directrices para decidir si debes usar entradas secundarias o CoGroupByKey
:
- Usa entradas secundarias cuando uno de los objetos
PCollection
a los que te unes sea desproporcionadamente más pequeño que los demás y el objetoPCollection
más pequeño quepa en la memoria de los trabajadores. Al almacenar en caché toda la entrada lateral en la memoria, se pueden obtener elementos de forma rápida y eficiente. - Usa entradas secundarias cuando tengas un objeto
PCollection
que deba unirse varias veces en tu canalización. En lugar de usar varias transformacionesCoGroupByKey
, crea una sola entrada lateral que puedan reutilizar varias transformacionesParDo
. - Usa
CoGroupByKey
si necesitas obtener una gran proporción de un objetoPCollection
que supere significativamente la memoria del trabajador.
Para obtener más información, consulta el artículo Solucionar errores de falta de memoria de Dataflow.
Minimizar las operaciones por elemento costosas
Una instancia de DoFn
procesa lotes de elementos llamados paquetes, que son unidades de trabajo atómicas que constan de cero o más elementos. A continuación, los elementos individuales se procesan mediante el método DoFn.ProcessElement
, que se ejecuta para cada elemento. Como se llama al método DoFn.ProcessElement
por cada elemento, las operaciones que consuman mucho tiempo o recursos computacionales y que se invoquen mediante ese método se ejecutarán por cada elemento procesado por el método.
Si solo necesitas realizar operaciones costosas una vez para un lote de elementos, incluye esas operaciones en el método DoFn.Setup
o en el método DoFn.StartBundle
en lugar de en el elemento DoFn.ProcessElement
. Por ejemplo, las siguientes operaciones:
Analizar un archivo de configuración que controla algún aspecto del comportamiento de la instancia de
DoFn
. Invoca esta acción solo una vez, cuando se inicialice la instancia deDoFn
, mediante el métodoDoFn.Setup
.Crear una instancia de un cliente de corta duración que se reutilice en todos los elementos de un paquete, como cuando todos los elementos del paquete se envían a través de una única conexión de red. Invoca esta acción una vez por paquete mediante el método
DoFn.StartBundle
.
Limitar el tamaño de los lotes y las llamadas simultáneas a servicios externos
Cuando llamas a servicios externos, puedes reducir los costes generales por llamada usando la transformación GroupIntoBatches
. Esta transformación crea lotes de elementos de un tamaño especificado.
El procesamiento por lotes envía elementos a un servicio externo como una sola carga útil en lugar de hacerlo individualmente.
En combinación con el procesamiento por lotes, limita el número máximo de llamadas paralelas (simultáneas) al servicio externo eligiendo las claves adecuadas para particionar los datos entrantes. El número de particiones determina la paralelización máxima. Por ejemplo, si a cada elemento se le asigna la misma clave, una transformación posterior para llamar al servicio externo no se ejecuta en paralelo.
Puedes usar uno de los siguientes métodos para generar claves de elementos:
- Elige un atributo del conjunto de datos que quieras usar como clave de datos, como los IDs de usuario.
- Genera claves de datos para dividir elementos aleatoriamente en un número fijo de particiones, donde el número de valores de clave posibles determina el número de particiones. Debes crear suficientes particiones para el paralelismo.
Cada partición debe tener suficientes elementos para que la transformación
GroupIntoBatches
sea útil.
En el siguiente ejemplo de código Java se muestra cómo dividir elementos aleatoriamente en diez particiones:
// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;
int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
sensitiveData
.apply("Assign data into partitions",
ParDo.of(new DoFn<String, KV<Long, String>>() {
Random random = new Random();
@ProcessElement
public void assignRandomPartition(ProcessContext context) {
context.output(
KV.of(randomPartitionNumber(), context.element()));
}
private static int randomPartitionNumber() {
return random.nextInt(numPartitions);
}
}))
.apply("Create batches of sensitive data",
GroupIntoBatches.<Long, String>ofSize(100L));
// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
.apply("Call Redaction API in batches", callRedactionApiOnBatch());
Identificar problemas de rendimiento causados por pasos fusionados
Dataflow crea un gráfico de pasos que representa tu flujo de procesamiento en función de las transformaciones y los datos que has usado para crearlo. Este gráfico se denomina gráfico de ejecución de la canalización.
Cuando implementas tu flujo de procesamiento, Dataflow puede modificar el gráfico de ejecución de tu flujo para mejorar el rendimiento. Por ejemplo, Dataflow puede fusionar algunas operaciones, un proceso conocido como optimización de la fusión, para evitar el impacto en el rendimiento y el coste de escribir cada objeto PCollection
intermedio en tu flujo de procesamiento.
En algunos casos, Dataflow puede determinar de forma incorrecta la mejor manera de fusionar operaciones en la canalización, lo que puede limitar la capacidad de tu trabajo para usar todos los trabajadores disponibles. En esos casos, puedes evitar que se combinen las operaciones.
Veamos el siguiente ejemplo de código de Apache Beam. Una transformación GenerateSequence
crea un objeto PCollection
pequeño y delimitado, que luego se procesa
con dos transformaciones ParDo
posteriores.
La transformación Find Primes Less-than-N
puede ser costosa a nivel computacional y es probable que se ejecute lentamente con números grandes. Por el contrario, la transformación Increment Number
probablemente se complete rápidamente.
import com.google.common.math.LongMath;
...
public class FusedStepsPipeline {
final class FindLowerPrimesFn extends DoFn<Long, String> {
@ProcessElement
public void processElement(ProcessContext c) {
Long n = c.element();
if (n > 1) {
for (long i = 2; i < n; i++) {
if (LongMath.isPrime(i)) {
c.output(Long.toString(i));
}
}
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
PCollection<Long> sequence = p.apply("Generate Sequence",
GenerateSequence
.from(0)
.to(1000000));
// Pipeline branch 1
sequence.apply("Find Primes Less-than-N",
ParDo.of(new FindLowerPrimesFn()));
// Pipeline branch 2
sequence.apply("Increment Number",
MapElements.via(new SimpleFunction<Long, Long>() {
public Long apply(Long n) {
return ++n;
}
}));
p.run().waitUntilFinish();
}
}
En el siguiente diagrama se muestra una representación gráfica de la canalización en la interfaz de monitorización de Dataflow.
La interfaz de monitorización de Dataflow muestra que ambas transformaciones tienen la misma velocidad de procesamiento lenta, concretamente 13 elementos por segundo. Puede que esperes que la transformación Increment Number
procese los elementos rápidamente, pero parece que está vinculada a la misma velocidad de procesamiento que Find Primes Less-than-N
.
Esto se debe a que Dataflow ha combinado los pasos en una sola fase, lo que impide que se ejecuten de forma independiente. Puedes usar el comando
gcloud dataflow jobs describe
para obtener más información:
gcloud dataflow jobs describe --full job-id --format json
En el resultado, los pasos combinados se describen en el objeto
ExecutionStageSummary
del array
ComponentTransform
:
...
"executionPipelineStage": [
{
"componentSource": [
...
],
"componentTransform": [
{
"name": "s1",
"originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
"userName": "Generate Sequence/Read(BoundedCountingSource)"
},
{
"name": "s2",
"originalTransform": "Find Primes Less-than-N",
"userName": "Find Primes Less-than-N"
},
{
"name": "s3",
"originalTransform": "Increment Number/Map",
"userName": "Increment Number/Map"
}
],
"id": "S01",
"kind": "PAR_DO_KIND",
"name": "F0"
}
...
En este caso, como la transformación Find Primes Less-than-N
es el paso lento, es una buena estrategia romper la fusión antes de ese paso. Una forma de
descombinar pasos es insertar una
transformación GroupByKey
y desagrupar antes del paso, como se muestra en el siguiente ejemplo de código Java.
sequence
.apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
public KV<Long, Void> apply(Long n) {
return KV.of(n, null);
}
}))
.apply("Group By Key", GroupByKey.<Long, Void>create())
.apply("Emit Keys", Keys.<Long>create())
.apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));
También puedes combinar estos pasos de desfusión en una transformación compuesta reutilizable.
Después de desfusionar los pasos, cuando ejecutes la canalización, Increment Number
se completará en cuestión de segundos y la transformación Find Primes Less-than-N
, que tarda mucho más, se ejecutará en una fase independiente.
En este ejemplo se aplica una operación de agrupar y desagrupar a pasos sin fusionar.
Puedes usar otros enfoques en otras circunstancias. En este caso, el tratamiento de la salida duplicada no es un problema, dada la salida consecutiva de la transformación GenerateSequence
.
Los objetos KV
con claves duplicadas se desduplican en una sola clave en la transformación de grupo
(GroupByKey
) y en la transformación de desagrupación
(Keys
). Para conservar los duplicados después de agrupar y desagrupar, crea pares clave-valor siguiendo estos pasos:
- Usa una clave aleatoria y la entrada original como valor.
- Agrupa los datos con la clave aleatoria.
- Emite los valores de cada clave como salida.
También puedes usar una transformación Reshuffle
para evitar que se fusionen las transformaciones circundantes. Sin embargo, los efectos secundarios de la transformación Reshuffle
no se pueden transferir entre diferentes ejecutores de Apache Beam.
Para obtener más información sobre el paralelismo y la optimización de la fusión, consulta Ciclo de vida de una canalización.
Usar métricas de Apache Beam para recoger estadísticas de la canalización
Las métricas de Apache Beam son una clase de utilidad que genera métricas para informar de las propiedades de un flujo de procesamiento en ejecución. Cuando usas Cloud Monitoring, las métricas de Apache Beam están disponibles como métricas personalizadas de Cloud Monitoring.
En el siguiente ejemplo se muestran las métricas de Apache BeamCounter
que se usan en una subclase DoFn
.
El código de ejemplo usa dos contadores. Un contador registra los errores de análisis de JSON (malformedCounter
) y el otro contador registra si el mensaje JSON es válido, pero contiene una carga útil vacía (emptyCounter
). En Cloud Monitoring, los nombres de las métricas personalizadas son custom.googleapis.com/dataflow/malformedJson
y custom.googleapis.com/dataflow/emptyPayload
. Puede usar las métricas personalizadas para crear visualizaciones y políticas de alertas en Cloud Monitoring.
final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};
final class ParseEventFn extends DoFn<String, MyObject> {
private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
private Gson gsonParser;
@Setup
public setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
if (myObj.getPayload() != null) {
// Output the element if non-empty payload
c.output(successTag, myObj);
}
else {
// Increment empty payload counter
emptyCounter.inc();
}
}
catch (JsonParseException e) {
// Increment malformed JSON counter
malformedCounter.inc();
// Output the element to dead-letter queue
c.output(errorTag, c.element());
}
}
}
Más información
En las páginas siguientes se ofrece más información sobre cómo estructurar tu flujo de trabajo, cómo elegir las transformaciones que quieres aplicar a tus datos y qué debes tener en cuenta al elegir los métodos de entrada y salida de tu flujo de trabajo.
- Desarrollar y probar flujos de procesamiento de Dataflow.
- Diseña tu flujo de procesamiento.
- Crea tu flujo de trabajo.
Para obtener más información sobre cómo crear tu código de usuario, consulta los requisitos de las funciones proporcionadas por el usuario.