從 Dataflow 寫入 Pub/Sub

本文說明如何使用 Apache Beam PubSubIO I/O 連接器,將 Dataflow 中的文字資料寫入 Pub/Sub。

總覽

如要將資料寫入 Pub/Sub,請使用 PubSubIO 連接器。輸入元素可以是 Pub/Sub 訊息,也可以只是訊息資料。如果輸入元素是 Pub/Sub 訊息,您也可以選擇在每則訊息中設定屬性或排序鍵。

您可以使用 Java、Python 或 Go 版本的 PubSubIO 連接器,如下所示:

Java

如要寫入單一主題,請呼叫 PubsubIO.writeMessages 方法。這個方法會採用 PubsubMessage 物件的輸入集合。連接器也會定義便利方法,用於編寫字串、二進位編碼的 Avro 訊息或二進位編碼的 protobuf 訊息。這些方法會將輸入的集合轉換為 Pub/Sub 訊息。

如要根據輸入資料寫入動態主題集,請呼叫 writeMessagesDynamic。呼叫訊息上的 PubsubMessage.withTopic,為每則訊息指定目的地主題。舉例來說,您可以根據輸入資料中特定欄位的值,將訊息轉送至不同主題。

詳情請參閱PubsubIO參考說明文件。

Python

呼叫 pubsub.WriteToPubSub 方法。 根據預設,這個方法會採用 bytes 類型的輸入集合,代表訊息酬載。如果 with_attributes 參數是 True,這個方法會採用 PubsubMessage 物件的集合。

詳情請參閱 pubsub 模組參考說明文件。

Go

如要將資料寫入 Pub/Sub,請呼叫 pubsubio.Write 方法。這個方法會接收 PubSubMessage 物件或包含訊息酬載的位元組切片輸入集合。

詳情請參閱 pubsubio 套件參考說明文件。

如要進一步瞭解 Pub/Sub 訊息,請參閱 Pub/Sub 說明文件中的「訊息格式」。

時間戳記

Pub/Sub 會為每則訊息設定時間戳記。這個時間戳記代表訊息發布至 Pub/Sub 的時間。在串流情境中,您可能也會在意「事件」時間戳記,也就是產生訊息資料的時間。您可以使用 Apache Beam 元素時間戳記來表示事件時間。建立不受限 PCollection 的來源通常會為每個新元素指派與事件時間相符的時間戳記。

如果是 Java 和 Python,Pub/Sub I/O 連接器可將每個元素的時間戳記寫為 Pub/Sub 訊息屬性。訊息消費者可以使用這項屬性取得事件時間戳記。

Java

呼叫 PubsubIO.Write<T>.withTimestampAttribute 並指定屬性的名稱。

Python

呼叫 WriteToPubSub 時,請指定 timestamp_attribute 參數。

郵件傳送

Dataflow 支援僅處理一次管道中的訊息。不過,Pub/Sub I/O 連接器無法保證透過 Pub/Sub 傳送的訊息只會傳送一次。

如果是 Java 和 Python,您可以設定 Pub/Sub I/O 連接器,將每個元素的專屬 ID 寫入為訊息屬性。訊息消費者隨後可以使用這項屬性來刪除重複訊息。

Java

呼叫 PubsubIO.Write<T>.withIdAttribute 並指定屬性的名稱。

Python

呼叫 WriteToPubSub 時,請指定 id_label 參數。

直接輸出

如果您在管道中啟用至少一次串流模式,I/O 連接器就會使用直接輸出。在這個模式中,連接器不會檢查點訊息,因此寫入速度較快。不過, 在此模式下重試可能會導致訊息重複,且訊息 ID 不同, 可能導致訊息消費者更難將訊息去重複。

如要為使用「只傳送一次」模式的管道啟用直接輸出,請設定 streaming_enable_pubsub_direct_output 服務選項。直接輸出可縮短寫入延遲時間,並提高處理效率。如果訊息消費者可以處理具有非專屬訊息 ID 的重複訊息,建議您採用這個選項。

範例

下列範例會建立 PCollection 的 Pub/Sub 訊息,並將這些訊息寫入 Pub/Sub 主題。主題會指定為管道選項。每則訊息都包含酬載資料和一組屬性。

Java

如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;



public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {"buyer": item["name"], "timestamp": str(item["ts"])}
    data = bytes(item["product"], "utf-8")

    return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {"name": "Robert", "product": "TV", "ts": 1613141590000},
        {"name": "Maria", "product": "Phone", "ts": 1612718280000},
        {"name": "Juan", "product": "Laptop", "ts": 1611618000000},
        {"name": "Rebeca", "product": "Video game", "ts": 1610000000000},
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(topic=options.topic, with_attributes=True)
        )

    print("Pipeline ran successfully.")