Organiza tus páginas con colecciones
Guarda y categoriza el contenido según tus preferencias.
La plantilla de Pub/Sub a Pub/Sub es una canalización de transmisión que lee mensajes de una suscripción de Pub/Sub y los escribe en otro tema de Pub/Sub. La canalización también acepta una clave de atributo de mensaje opcional y un valor que se puede usar para filtrar los mensajes que se deben escribir en el tema de Pub/Sub. Puedes usar esta plantilla para copiar mensajes de una suscripción de Pub/Sub a otro tema de Pub/Sub con un filtro de mensajes opcional.
Requisitos de la canalización
La suscripción a Pub/Sub de origen debe existir antes de la ejecución.
El tema de Pub/Sub de destino debe existir antes de la ejecución.
Parámetros de la plantilla
Parámetros obligatorios
inputSubscription: El tema de Pub/Sub desde el que se lee la entrada. (Ejemplo: projects/your-project-id/subscriptions/your-subscription-name).
outputTopic: El tema de Pub/Sub en el que se escribirá el resultado. (Ejemplo: projects/your-project-id/topics/your-topic-name).
Parámetros opcionales
filterKey: La clave de atributo que se usará para filtrar eventos. No se aplican filtros si no se especifica filterKey.
filterValue: El valor del atributo que se usará para filtrar eventos cuando se proporciona un filterKey. De forma predeterminada, se usa un filterValue nulo.
Ejecuta la plantilla
Console
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
TOPIC_NAME: Es el nombre del tema de Pub/Sub.
FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento.
Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.
el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
TOPIC_NAME: Es el nombre del tema de Pub/Sub.
FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento.
Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.
Código fuente de la plantilla
Java
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubsubToPubsub.Options;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A template that copies messages from one Pubsub subscription to another Pubsub topic. Allows
* filtering specific messages.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_Cloud_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_PubSub_to_Cloud_PubSub",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Pub/Sub",
description =
"The Pub/Sub to Pub/Sub template is a streaming pipeline that reads messages from a Pub/Sub subscription and "
+ "writes the messages to another Pub/Sub topic. The pipeline also accepts an optional message attribute key and a value that can be used to filter the messages that should be written to the Pub/Sub topic. You can use this template to copy messages from a Pub/Sub subscription to another Pub/Sub topic with an optional message filter.",
optionsClass = Options.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-pubsub",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The source Pub/Sub subscription must exist prior to execution.",
"The source Pub/Sub subscription must be a <a href=\"https://cloud.google.com/pubsub/docs/pull\">pull subscription</a>.",
"The destination Pub/Sub topic must exist prior to execution."
},
streaming = true,
supportsAtLeastOnce = true,
supportsExactlyOnce = true)
public class PubsubToPubsub {
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
Steps:
<p>1) Read PubSubMessage with attributes from input PubSub subscription.
<p>2) Apply any filters if an attribute=value pair is provided.
<p>3) Write each PubSubMessage to output PubSub topic.
*/
pipeline
.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(
"Filter Events If Enabled",
ParDo.of(
ExtractAndFilterEventsFn.newBuilder()
.withFilterKey(options.getFilterKey())
.withFilterValue(options.getFilterValue())
.build()))
.apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Options supported by {@link PubsubToPubsub}.
*
* <p>Inherits standard configuration options.
*/
public interface Options extends PipelineOptions, StreamingOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
groupName = "Source",
description = "Pub/Sub input subscription",
helpText = "The Pub/Sub subscription to read the input from.",
example = "projects/your-project-id/subscriptions/your-subscription-name")
@Validation.Required
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> inputSubscription);
@TemplateParameter.PubsubTopic(
order = 2,
groupName = "Target",
description = "Output Pub/Sub topic",
helpText = "The Pub/Sub topic to write the output to.",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> outputTopic);
@TemplateParameter.Text(
order = 3,
optional = true,
description = "Event filter key",
helpText =
"The attribute key to use to filter events. No filters are applied if `filterKey` is not specified.")
ValueProvider<String> getFilterKey();
void setFilterKey(ValueProvider<String> filterKey);
@TemplateParameter.Text(
order = 4,
optional = true,
description = "Event filter value",
helpText =
"The attribute value to use to filter events when a `filterKey` is provided. By default, a null `filterValue` is used.")
ValueProvider<String> getFilterValue();
void setFilterValue(ValueProvider<String> filterValue);
}
/**
* DoFn that will determine if events are to be filtered. If filtering is enabled, it will only
* publish events that pass the filter else, it will publish all input events.
*/
@AutoValue
public abstract static class ExtractAndFilterEventsFn extends DoFn<PubsubMessage, PubsubMessage> {
private static final Logger LOG = LoggerFactory.getLogger(ExtractAndFilterEventsFn.class);
// Counter tracking the number of incoming Pub/Sub messages.
private static final Counter INPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "inbound-messages");
// Counter tracking the number of output Pub/Sub messages after the user provided filter
// is applied.
private static final Counter OUTPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "filtered-outbound-messages");
private Boolean doFilter;
private String inputFilterKey;
private Pattern inputFilterValueRegex;
private Boolean isNullFilterValue;
public static Builder newBuilder() {
return new AutoValue_PubsubToPubsub_ExtractAndFilterEventsFn.Builder();
}
@Nullable
abstract ValueProvider<String> filterKey();
@Nullable
abstract ValueProvider<String> filterValue();
@Setup
public void setup() {
if (this.doFilter != null) {
return; // Filter has been evaluated already
}
inputFilterKey = (filterKey() == null ? null : filterKey().get());
if (inputFilterKey == null) {
// Disable input message filtering.
this.doFilter = false;
} else {
this.doFilter = true; // Enable filtering.
String inputFilterValue = (filterValue() == null ? null : filterValue().get());
if (inputFilterValue == null) {
LOG.warn(
"User provided a NULL for filterValue. Only messages with a value of NULL for the"
+ " filterKey: {} will be filtered forward",
inputFilterKey);
// For backward compatibility, we are allowing filtering by null filterValue.
this.isNullFilterValue = true;
this.inputFilterValueRegex = null;
} else {
this.isNullFilterValue = false;
try {
inputFilterValueRegex = getFilterPattern(inputFilterValue);
} catch (PatternSyntaxException e) {
LOG.error("Invalid regex pattern for supplied filterValue: {}", inputFilterValue);
throw new RuntimeException(e);
}
}
LOG.info(
"Enabling event filter [key: " + inputFilterKey + "][value: " + inputFilterValue + "]");
}
}
@ProcessElement
public void processElement(ProcessContext context) {
INPUT_COUNTER.inc();
if (!this.doFilter) {
// Filter is not enabled
writeOutput(context, context.element());
} else {
PubsubMessage message = context.element();
String extractedValue = message.getAttribute(this.inputFilterKey);
if (this.isNullFilterValue) {
if (extractedValue == null) {
// If we are filtering for null and the extracted value is null, we forward
// the message.
writeOutput(context, message);
}
} else {
if (extractedValue != null
&& this.inputFilterValueRegex.matcher(extractedValue).matches()) {
// If the extracted value is not null and it matches the filter,
// we forward the message.
writeOutput(context, message);
}
}
}
}
/**
* Write a {@link PubsubMessage} and increment the output counter.
*
* @param context {@link ProcessContext} to write {@link PubsubMessage} to.
* @param message {@link PubsubMessage} output.
*/
private void writeOutput(ProcessContext context, PubsubMessage message) {
OUTPUT_COUNTER.inc();
context.output(message);
}
/**
* Return a {@link Pattern} based on a user provided regex string.
*
* @param regex Regex string to compile.
* @return {@link Pattern}
* @throws PatternSyntaxException If the string is an invalid regex.
*/
private Pattern getFilterPattern(String regex) throws PatternSyntaxException {
checkNotNull(regex, "Filter regex cannot be null.");
return Pattern.compile(regex);
}
/** Builder class for {@link ExtractAndFilterEventsFn}. */
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFilterKey(ValueProvider<String> filterKey);
abstract Builder setFilterValue(ValueProvider<String> filterValue);
abstract ExtractAndFilterEventsFn build();
/**
* Method to set the filterKey used for filtering messages.
*
* @param filterKey Lookup key for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterKey(ValueProvider<String> filterKey) {
checkArgument(filterKey != null, "withFilterKey(filterKey) called with null input.");
return setFilterKey(filterKey);
}
/**
* Method to set the filterValue used for filtering messages.
*
* @param filterValue Lookup value for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterValue(ValueProvider<String> filterValue) {
checkArgument(filterValue != null, "withFilterValue(filterValue) called with null input.");
return setFilterValue(filterValue);
}
}
}
}