Membaca dari Apache Kafka ke Dataflow

Dokumen ini menjelaskan cara membaca data dari Apache Kafka ke Dataflow dan mencakup tips performa serta praktik terbaik.

Untuk sebagian besar kasus penggunaan, pertimbangkan untuk menggunakan konektor Managed I/O untuk membaca dari Kafka.

Jika Anda memerlukan penyesuaian performa yang lebih canggih, pertimbangkan untuk menggunakan konektor KafkaIO. Konektor KafkaIO tersedia untuk Java atau dengan menggunakan framework pipeline multi-bahasa untuk Python dan Go.

Keparalelan

Paralelisme dibatasi oleh dua faktor: jumlah maksimum pekerja (max_num_workers) dan jumlah partisi Kafka. Dataflow secara default menggunakan fanout paralelisme 4 x max_num_workers. Namun, fanout dibatasi oleh jumlah partisi. Misalnya, jika 100 vCPU tersedia, tetapi pipeline hanya membaca dari 10 partisi Kafka, paralelisme maksimum adalah 10.

Untuk memaksimalkan paralelisme, sebaiknya miliki minimal 4 x partisi Kafka max_num_workers. Jika tugas Anda menggunakan Runner v2, pertimbangkan untuk menyetel paralelisme yang lebih tinggi. Titik awal yang baik adalah memiliki partisi yang sama dengan dua kali jumlah vCPU pekerja.

Jika Anda tidak dapat meningkatkan jumlah partisi, Anda dapat meningkatkan paralelisme dengan memanggil KafkaIO.Read.withRedistribute. Metode ini menambahkan transformasi Redistribute ke pipeline, yang memberikan petunjuk kepada Dataflow untuk mendistribusikan ulang dan memparalelkan data secara lebih efisien. Anda juga dapat menentukan jumlah optimal shard yang akan digunakan dalam langkah redistribusi, dengan memanggil KafkaIO.Read.withRedistributeNumKeys. Dataflow memperlakukan nilai ini sebagai petunjuk pengoptimalan. Mendistribusikan ulang data akan menambah beberapa overhead tambahan untuk melakukan langkah pengacakan. Untuk mengetahui informasi selengkapnya, lihat Mencegah penggabungan.

Coba pastikan beban antarpartisi relatif merata dan tidak miring. Jika beban tidak seimbang, hal ini dapat menyebabkan pemanfaatan pekerja yang buruk. Worker yang membaca dari partisi dengan beban yang lebih ringan mungkin relatif tidak aktif, sedangkan worker yang membaca dari partisi dengan beban berat mungkin tertinggal. Dataflow menyediakan metrik untuk backlog per partisi.

Jika beban tidak seimbang, penyeimbangan kerja dinamis dapat membantu mendistribusikan pekerjaan. Misalnya, Dataflow dapat mengalokasikan satu worker untuk membaca dari beberapa partisi bervolume rendah, dan mengalokasikan worker lain untuk membaca dari satu partisi bervolume tinggi. Namun, dua pekerja tidak dapat membaca dari partisi yang sama, sehingga partisi yang sangat padat masih dapat menyebabkan pipeline tertinggal.

Praktik terbaik

Bagian ini berisi rekomendasi untuk membaca dari Kafka ke Dataflow.

Topik dengan volume rendah

Skenario umum adalah membaca dari banyak topik bervolume rendah secara bersamaan —misalnya, satu topik per pelanggan. Membuat tugas Dataflow terpisah untuk setiap topik tidak efisien dari segi biaya, karena setiap tugas memerlukan setidaknya satu pekerja penuh. Sebagai gantinya, pertimbangkan opsi berikut:

  • Gabungkan topik. Gabungkan topik sebelum diserap ke dalam Dataflow. Menyerap beberapa topik bervolume tinggi jauh lebih efisien daripada menyerap banyak topik bervolume rendah. Setiap topik bervolume tinggi dapat ditangani oleh satu tugas Dataflow yang sepenuhnya memanfaatkan pekerjanya.

  • Membaca beberapa topik. Jika Anda tidak dapat menggabungkan topik sebelum menyerapnya ke Dataflow, pertimbangkan untuk membuat pipeline yang membaca dari beberapa topik. Pendekatan ini memungkinkan Dataflow menetapkan beberapa topik ke worker yang sama. Ada dua cara untuk menerapkan pendekatan ini:

    • Langkah pembacaan tunggal. Buat satu instance konektor KafkaIO dan konfigurasikan untuk membaca beberapa topik. Kemudian, filter menurut nama topik untuk menerapkan logika yang berbeda per topik. Untuk kode contoh, lihat Membaca dari beberapa topik. Pertimbangkan opsi ini jika semua topik Anda ditempatkan bersama dalam cluster yang sama. Salah satu kekurangannya adalah masalah pada satu tujuan atau transformasi dapat menyebabkan semua topik menumpuk backlog.

      Untuk kasus penggunaan yang lebih lanjut, teruskan sekumpulan objek KafkaSourceDescriptor yang menentukan topik yang akan dibaca. Menggunakan KafkaSourceDescriptor memungkinkan Anda memperbarui daftar topik nanti jika diperlukan. Fitur ini memerlukan Java dengan Runner v2.

    • Beberapa langkah membaca. Untuk membaca dari topik yang berada di cluster yang berbeda, pipeline Anda dapat menyertakan beberapa instance KafkaIO. Saat tugas sedang berjalan, Anda dapat memperbarui setiap sumber menggunakan pemetaan transformasi. Menetapkan topik atau cluster baru hanya didukung saat menggunakan Runner v2. Kemampuan pengamatan adalah potensi tantangan dengan pendekatan ini, karena Anda perlu memantau setiap transformasi bacaan satu per satu, bukan mengandalkan metrik tingkat pipeline.

Melakukan kembali ke Kafka

Secara default, konektor KafkaIO tidak menggunakan offset Kafka untuk melacak progres dan tidak melakukan kembali ke Kafka. Jika Anda memanggil commitOffsetsInFinalize, konektor akan berupaya melakukan commit kembali ke Kafka setelah record di-commit di Dataflow. Rekaman yang di-commit di Dataflow mungkin tidak diproses sepenuhnya, jadi jika Anda membatalkan pipeline, offset mungkin di-commit tanpa rekaman diproses sepenuhnya.

Karena setelan enable.auto.commit=True melakukan offset segera setelah dibaca dari Kafka tanpa pemrosesan apa pun oleh Dataflow, sebaiknya jangan gunakan opsi ini. Sebaiknya setel enable.auto.commit=False dan commitOffsetsInFinalize=True. Jika Anda menyetel enable.auto.commit ke True, data dapat hilang jika pipeline terganggu saat diproses. Data yang sudah di-commit di Kafka mungkin dihapus.

Watermark

Secara default, konektor KafkaIO menggunakan waktu pemrosesan saat ini untuk menetapkan watermark output dan waktu peristiwa. Untuk mengubah perilaku ini, panggil withTimestampPolicyFactory dan tetapkan TimestampPolicy. Beam menyediakan implementasi TimestampPolicy yang menghitung watermark berdasarkan waktu penambahan log Kafka atau waktu pembuatan pesan.

Pertimbangan pelari

Konektor KafkaIO memiliki dua implementasi dasar untuk pembacaan Kafka, yaitu ReadFromKafkaViaUnbounded yang lebih lama dan ReadFromKafkaViaSDF yang lebih baru. Dataflow secara otomatis memilih penerapan terbaik untuk tugas Anda berdasarkan bahasa SDK dan persyaratan tugas Anda. Hindari meminta implementasi runner atau Kafka secara eksplisit kecuali jika Anda memerlukan fitur tertentu yang hanya tersedia dalam implementasi tersebut. Untuk mengetahui informasi selengkapnya tentang cara memilih runner, lihat Menggunakan Runner Dataflow v2.

Jika pipeline Anda menggunakan withTopic atau withTopics, implementasi yang lebih lama akan mengkueri Kafka pada waktu pembuatan pipeline untuk partisi yang tersedia. Mesin yang membuat pipeline harus memiliki izin untuk terhubung ke Kafka. Jika Anda menerima error izin, pastikan Anda memiliki izin untuk terhubung ke Kafka secara lokal. Anda dapat menghindari masalah ini dengan menggunakan withTopicPartitions, yang tidak terhubung ke Kafka pada waktu pembuatan pipeline.

Men-deploy ke produksi

Saat men-deploy solusi dalam produksi, sebaiknya gunakan template Flex. Dengan menggunakan template Flex, pipeline diluncurkan dari lingkungan yang konsisten, yang dapat membantu mengurangi masalah konfigurasi lokal.

Logging dari KafkaIO bisa sangat panjang. Pertimbangkan untuk mengurangi tingkat logging dalam produksi sebagai berikut:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Untuk mengetahui informasi selengkapnya, lihat Menetapkan level log pekerja pipeline.

Mengonfigurasi jaringan

Secara default, Dataflow meluncurkan instance dalam jaringan Virtual Private Cloud (VPC) default Anda. Bergantung pada konfigurasi Kafka, Anda mungkin perlu mengonfigurasi jaringan dan subnet yang berbeda untuk Dataflow. Untuk mengetahui informasi selengkapnya, lihat Menentukan jaringan dan subnetwork. Saat mengonfigurasi jaringan, buat aturan firewall yang memungkinkan mesin pekerja Dataflow menjangkau broker Kafka.

Jika Anda menggunakan Kontrol Layanan VPC, tempatkan cluster Kafka dalam perimeter Kontrol Layanan VPC, atau perluas perimeter ke VPN atau Cloud Interconnect yang diberi otorisasi.

Jika cluster Kafka Anda di-deploy di luar Google Cloud, Anda harus membuat koneksi jaringan antara Dataflow dan cluster Kafka. Ada beberapa opsi jaringan dengan berbagai kelebihan dan kekurangan:

Dedicated Interconnect adalah opsi terbaik untuk performa dan keandalan yang dapat diprediksi, tetapi penyiapannya dapat memakan waktu lebih lama karena pihak ketiga harus menyediakan sirkuit baru. Dengan topologi berbasis IP publik, Anda dapat memulai dengan cepat karena hanya sedikit pekerjaan jaringan yang perlu dilakukan.

Dua bagian berikutnya menjelaskan opsi ini secara lebih mendetail.

Ruang alamat RFC 1918 bersama

Dedicated Interconnect dan IPsec VPN memberi Anda akses langsung ke alamat IP RFC 1918 di Virtual Private Cloud (VPC), yang dapat menyederhanakan konfigurasi Kafka Anda. Jika Anda menggunakan topologi berbasis VPN, pertimbangkan untuk menyiapkan VPN throughput tinggi.

Secara default, Dataflow meluncurkan instance di jaringan VPC default Anda. Dalam topologi jaringan pribadi dengan rute yang ditentukan secara eksplisit di Cloud Router yang menghubungkan subnetwork di Google Cloud ke cluster Kafka tersebut, Anda memerlukan kontrol lebih besar atas lokasi instance Dataflow Anda. Anda dapat menggunakan Dataflow untuk mengonfigurasi network dan subnetwork parameter eksekusi.

Pastikan subnetwork yang sesuai memiliki alamat IP yang cukup tersedia agar Dataflow dapat meluncurkan instance saat mencoba melakukan penskalaan. Selain itu, saat membuat jaringan terpisah untuk meluncurkan instance Dataflow, pastikan Anda memiliki aturan firewall yang mengaktifkan traffic TCP di antara semua virtual machine dalam project. Jaringan default sudah mengonfigurasi aturan firewall ini.

Ruang alamat IP publik

Arsitektur ini menggunakan Transport Layer Security (TLS) untuk mengamankan lalu lintas antara klien eksternal dan Kafka, serta menggunakan lalu lintas yang tidak terenkripsi untuk komunikasi antar-broker. Jika pendengar Kafka terikat ke antarmuka jaringan yang digunakan untuk komunikasi internal dan eksternal, konfigurasi pendengar akan sederhana. Namun, dalam banyak skenario, alamat broker Kafka yang diiklankan secara eksternal dalam cluster berbeda dengan antarmuka jaringan internal yang digunakan Kafka. Dalam skenario tersebut, Anda dapat menggunakan properti advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Klien eksternal terhubung menggunakan port 9093 melalui saluran "SSL", dan klien internal terhubung menggunakan port 9092 melalui saluran teks biasa. Saat Anda menentukan alamat di bagian advertised.listeners, gunakan nama DNS (kafkabroker-n.mydomain.com, dalam contoh ini) yang di-resolve ke instance yang sama untuk traffic eksternal dan internal. Penggunaan alamat IP publik mungkin tidak berfungsi karena alamat tersebut mungkin gagal diselesaikan untuk traffic internal.

Menyesuaikan Kafka

Setelan cluster Kafka dan klien Kafka dapat berdampak besar pada performa. Secara khusus, setelan berikut mungkin terlalu rendah. Bagian ini memberikan beberapa titik awal yang disarankan, tetapi Anda harus bereksperimen dengan nilai ini untuk workload tertentu.

  • unboundedReaderMaxElements. Defaultnya adalah 10.000. Nilai yang lebih tinggi seperti 100.000 dapat meningkatkan ukuran bundle, yang dapat meningkatkan performa secara signifikan jika pipeline Anda menyertakan penggabungan. Namun, nilai yang lebih tinggi juga dapat meningkatkan latensi. Untuk menetapkan nilai, gunakan setUnboundedReaderMaxElements. Setelan ini tidak berlaku untuk Runner v2.

  • unboundedReaderMaxReadTimeMs. Defaultnya adalah 10.000 md. Nilai yang lebih tinggi seperti 20.000 md dapat meningkatkan ukuran paket, sedangkan nilai yang lebih rendah seperti 5.000 md dapat mengurangi latensi atau backlog. Untuk menetapkan nilai, gunakan setUnboundedReaderMaxReadTimeMs. Setelan ini tidak berlaku untuk Runner v2.

  • max.poll.records. Defaultnya adalah 500. Nilai yang lebih tinggi mungkin berperforma lebih baik dengan mengambil lebih banyak catatan masuk secara bersamaan, terutama saat menggunakan Runner v2. Untuk menetapkan nilai, panggil withConsumerConfigUpdates.

  • fetch.max.bytes. Defaultnya adalah 1 MB. Nilai yang lebih tinggi dapat meningkatkan throughput dengan mengurangi jumlah permintaan, terutama saat menggunakan Runner v2. Namun, menyetelnya terlalu tinggi dapat meningkatkan latensi, meskipun pemrosesan hilir lebih cenderung menjadi hambatan utama. Nilai awal yang direkomendasikan adalah 100 MB. Untuk menetapkan nilai, panggil withConsumerConfigUpdates.

  • max.partition.fetch.bytes. Defaultnya adalah 1 MB. Parameter ini menetapkan jumlah data maksimum per partisi yang ditampilkan server. Meningkatkan nilai dapat meningkatkan throughput dengan mengurangi jumlah permintaan, terutama saat menggunakan Runner v2. Namun, menyetelnya terlalu tinggi dapat meningkatkan latensi, meskipun pemrosesan di hilir lebih cenderung menjadi hambatan utama. Nilai awal yang direkomendasikan adalah 100 MB. Untuk menetapkan nilai, panggil withConsumerConfigUpdates.

  • consumerPollingTimeout. Defaultnya adalah 2 detik. Jika klien konsumen mengalami waktu tunggu sebelum dapat membaca data apa pun, coba tetapkan nilai yang lebih tinggi. Setelan ini paling sering relevan saat melakukan pembacaan lintas region atau pembacaan dengan jaringan yang lambat. Untuk menetapkan nilai, panggil withConsumerPollingTimeout.

Pastikan receive.buffer.bytes cukup besar untuk menangani ukuran pesan. Jika nilainya terlalu kecil, log mungkin menunjukkan bahwa konsumen terus dibuat ulang dan mencari offset tertentu.

Contoh

Contoh kode berikut menunjukkan cara membuat pipeline Dataflow yang membaca dari Kafka. Saat menggunakan Kredensial Default Aplikasi bersama dengan callback handler yang disediakan oleh Google Cloud Managed Service for Apache Kafka, diperlukan kafka-clients versi 3.7.0 atau yang lebih tinggi.

Membaca dari satu topik

Contoh ini menggunakan konektor I/O Terkelola. Contoh ini menunjukkan cara membaca dari topik Kafka dan menulis payload pesan ke file teks.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | beam.managed.Read(
                beam.managed.KAFKA,
                config={
                  "bootstrap_servers": options.bootstrap_server,
                  "topic": options.topic,
                  "data_format": "RAW",
                  "auto_offset_reset_config": "earliest",
                  # The max_read_time_seconds parameter is intended for testing.
                  # Avoid using this parameter in production.
                  "max_read_time_seconds": 5
                }
            )
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Membaca dari beberapa topik

Contoh ini menggunakan konektor KafkaIO. Menunjukkan cara membaca dari beberapa topik Kafka dan menerapkan logika pipeline terpisah untuk setiap topik.

Untuk kasus penggunaan yang lebih canggih, teruskan secara dinamis serangkaian objek KafkaSourceDescriptor, sehingga Anda dapat memperbarui daftar topik yang akan dibaca. Pendekatan ini memerlukan Java dengan Runner v2.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class KafkaReadTopics {

  public static Pipeline createPipeline(Options options) {
    String topic1 = options.getTopic1();
    String topic2 = options.getTopic2();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    var allTopics = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withTopics(List.of(topic1, topic2))
            .withBootstrapServers(options.getBootstrapServer())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withMaxReadTime(Duration.standardSeconds(10))
            .withStartReadTime(Instant.EPOCH)
        );

    // Create separate pipeline branches for each topic.
    // The first branch filters on topic1.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic1)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic1)
            .withSuffix(".txt")
            .withNumShards(1)
        );

    // The second branch filters on topic2.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic2)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic2)
            .withSuffix(".txt")
            .withNumShards(1)
        );
    return pipeline;
  }
}

Python

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))

Langkah berikutnya