Dokumen ini menjelaskan cara menulis data teks dari Dataflow ke
Pub/Sub menggunakan konektor I/O
PubSubIO
Apache Beam.
Ringkasan
Untuk menulis data ke Pub/Sub, gunakan konektor PubSubIO
. Elemen
input dapat berupa pesan Pub/Sub atau hanya data pesan.
Jika elemen input adalah pesan Pub/Sub, Anda dapat secara opsional
menetapkan atribut atau kunci pengurutan pada setiap pesan.
Anda dapat menggunakan konektor PubSubIO
versi Java, Python, atau Go,
seperti berikut:
Java
Untuk menulis ke satu topik, panggil metode
PubsubIO.writeMessages
. Metode
ini mengambil kumpulan input objek PubsubMessage
. Konektor juga menentukan metode praktis untuk menulis string, pesan Avro yang dienkode biner, atau pesan protobuf yang dienkode biner. Metode ini mengonversi kumpulan
input menjadi pesan Pub/Sub.
Untuk menulis ke sekumpulan topik dinamis berdasarkan data input, panggil
writeMessagesDynamic
. Tentukan
topik tujuan untuk setiap pesan dengan memanggil PubsubMessage.withTopic
pada
pesan. Misalnya, Anda dapat merutekan pesan ke topik yang berbeda berdasarkan nilai kolom tertentu dalam data input.
Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi PubsubIO
.
Python
Panggil metode pubsub.WriteToPubSub
.
Secara default, metode ini mengambil kumpulan input jenis bytes
,
yang merepresentasikan payload pesan. Jika parameter with_attributes
adalah
True
, metode akan mengambil kumpulan objek PubsubMessage
.
Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi modul pubsub
.
Go
Untuk menulis data ke Pub/Sub, panggil metode
pubsubio.Write
. Metode ini mengambil
kumpulan input objek PubSubMessage
atau slice byte yang berisi
payload pesan.
Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi paket pubsubio
.
Untuk mengetahui informasi selengkapnya tentang pesan Pub/Sub, lihat Format pesan dalam dokumentasi Pub/Sub.
Stempel waktu
Pub/Sub menetapkan stempel waktu pada setiap pesan. Stempel waktu ini
menunjukkan waktu saat pesan dipublikasikan ke Pub/Sub. Dalam skenario streaming, Anda mungkin juga memperhatikan stempel waktu peristiwa, yaitu waktu saat data pesan dibuat. Anda dapat menggunakan
stempel waktu elemen Apache Beam
untuk merepresentasikan waktu peristiwa. Sumber yang membuat PCollection
tanpa batas sering kali menetapkan stempel waktu yang sesuai dengan waktu peristiwa untuk setiap elemen baru.
Untuk Java dan Python, konektor I/O Pub/Sub dapat menulis stempel waktu setiap elemen sebagai atribut pesan Pub/Sub. Konsumen pesan dapat menggunakan atribut ini untuk mendapatkan stempel waktu peristiwa.
Java
Panggil PubsubIO.Write<T>.withTimestampAttribute
dan tentukan nama atribut.
Python
Tentukan parameter timestamp_attribute
saat Anda memanggil WriteToPubSub
.
Pengiriman pesan
Dataflow mendukung pemrosesan tepat satu kali pesan dalam pipeline. Namun, konektor I/O Pub/Sub tidak dapat menjamin pengiriman pesan tepat satu kali melalui Pub/Sub.
Untuk Java dan Python, Anda dapat mengonfigurasi konektor I/O Pub/Sub untuk menulis ID unik setiap elemen sebagai atribut pesan. Konsumen pesan kemudian dapat menggunakan atribut ini untuk menghapus duplikat pesan.
Java
Panggil PubsubIO.Write<T>.withIdAttribute
dan tentukan nama atribut.
Python
Tentukan parameter id_label
saat Anda memanggil WriteToPubSub
.
Output langsung
Jika Anda mengaktifkan mode streaming setidaknya sekali di pipeline, konektor I/O akan menggunakan output langsung. Dalam mode ini, konektor tidak melakukan checkpoint pesan, sehingga penulisan lebih cepat. Namun, coba lagi dalam mode ini dapat menyebabkan pesan duplikat dengan ID pesan yang berbeda, sehingga mungkin mempersulit konsumen pesan untuk menghapus duplikat pesan.
Untuk pipeline yang menggunakan mode tepat sekali, Anda dapat mengaktifkan output langsung dengan
menetapkan streaming_enable_pubsub_direct_output
opsi layanan. Output langsung
mengurangi latensi penulisan dan menghasilkan pemrosesan yang lebih efisien. Pertimbangkan opsi ini jika konsumen pesan Anda dapat menangani pesan duplikat dengan ID pesan yang tidak unik.
Contoh
Contoh berikut membuat PCollection
pesan Pub/Sub
dan menuliskannya ke topik Pub/Sub. Topik ditentukan sebagai opsi
pipeline. Setiap pesan berisi data payload dan serangkaian atribut.
Java
Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Python
Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.