Beberapa template Dataflow yang disediakan Google mendukung fungsi yang ditentukan pengguna (UDF). Dengan UDF, Anda dapat memperluas fungsi template tanpa mengubah kode template.
Ringkasan
Untuk membuat UDF, Anda menulis fungsi JavaScript atau fungsi Python, bergantung pada template. Anda menyimpan file kode UDF di Cloud Storage dan menentukan lokasi sebagai parameter template. Untuk setiap elemen input, template memanggil fungsi Anda. Fungsi ini mengubah elemen atau melakukan logika kustom lainnya dan menampilkan hasilnya kembali ke template.
Misalnya, Anda dapat menggunakan UDF untuk:
- Memformat ulang data input agar sesuai dengan skema target.
- Menyamarkan data sensitif.
- Memfilter beberapa elemen dari output.
Input ke fungsi UDF adalah satu elemen data, yang diserialkan sebagai string JSON. Fungsi ini menampilkan string JSON yang diserialkan sebagai output. Format data bergantung pada template. Misalnya, dalam template Pub/Sub Subscription to BigQuery, inputnya adalah data pesan Pub/Sub yang diserialisasi sebagai objek JSON, dan outputnya adalah objek JSON yang diserialisasi yang merepresentasikan baris tabel BigQuery. Untuk mengetahui informasi selengkapnya, lihat dokumentasi untuk setiap template.
Menjalankan template dengan UDF
Untuk menjalankan template dengan UDF, Anda menentukan lokasi Cloud Storage file JavaScript dan nama fungsi sebagai parameter template.
Dengan beberapa template yang disediakan Google, Anda juga dapat membuat UDF langsung di konsolGoogle Cloud , sebagai berikut:
Buka halaman Dataflow di konsol Google Cloud .
Klik add_boxBuat tugas dari template.
Pilih template yang disediakan Google yang ingin Anda jalankan.
Luaskan Parameter opsional. Jika template mendukung UDF, template tersebut memiliki parameter untuk lokasi UDF Cloud Storage dan parameter lain untuk nama fungsi.
Di samping parameter template, klik Buat UDF.
Di panel Pilih atau Buat Fungsi yang Ditentukan Pengguna (UDF):
- Masukkan nama file. Contoh:
my_udf.js
. - Pilih folder Cloud Storage.
Contoh:
gs://your-bucket/your-folder
. - Gunakan editor kode inline untuk menulis fungsi. Editor sudah diisi dengan kode boilerplate yang dapat Anda gunakan sebagai titik awal.
Klik Buat UDF.
Konsol Google Cloud menyimpan file UDF dan mengisi lokasi Cloud Storage.
Masukkan nama fungsi Anda di kolom yang sesuai.
- Masukkan nama file. Contoh:
Menulis UDF JavaScript
Kode berikut menunjukkan UDF JavaScript no-op yang dapat Anda mulai:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
Kode JavaScript berjalan di
mesin JavaScript Nashorn. Sebaiknya
uji UDF Anda di mesin Nashorn sebelum men-deploy-nya. Mesin Nashorn
tidak sama persis dengan penerapan JavaScript Node.js. Masalah umum adalah penggunaan console.log()
atau Number.isNaN()
, yang keduanya tidak ditentukan dalam mesin Nashorn.
Anda dapat menguji UDF di mesin Nashorn menggunakan Cloud Shell, yang telah menginstal JDK 11. Luncurkan Nashorn dalam mode interaktif sebagai berikut:
jjs --language=es6
Di shell interaktif Nashorn, lakukan langkah-langkah berikut:
- Panggil
load
untuk memuat file JavaScript UDF Anda. - Tentukan objek JSON input bergantung pada pesan yang diharapkan pipeline Anda.
- Gunakan fungsi
JSON.stringify
untuk melakukan serialisasi input ke string JSON. - Panggil fungsi UDF Anda untuk memproses string JSON.
- Panggil
JSON.parse
untuk mendeserialisasi output. - Verifikasi hasilnya.
Contoh:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
Menulis UDF Python
Kode berikut menunjukkan UDF Python no-op yang dapat Anda mulai:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
UDF Python mendukung paket dependensi yang standar untuk Python dan Apache Beam. Mereka tidak dapat menggunakan paket pihak ketiga.
Penanganan error
Biasanya, jika terjadi error selama eksekusi UDF, error tersebut akan ditulis ke lokasi pesan yang tidak terkirim. Detailnya bergantung pada template. Misalnya, template
Pub/Sub Subscription to BigQuery
membuat tabel _error_records
dan menulis error di sana. Error UDF Runtime dapat terjadi karena error sintaksis atau pengecualian yang tidak tertangkap. Untuk memeriksa
error sintaksis, uji UDF Anda secara lokal.
Anda dapat memunculkan pengecualian secara terprogram untuk elemen yang tidak boleh diproses. Dalam kasus ini, elemen ditulis ke lokasi pesan yang tidak terkirim, jika template mendukungnya. Untuk contoh yang menunjukkan pendekatan ini, lihat Merutekan peristiwa.
Contoh kasus penggunaan
Bagian ini menjelaskan beberapa pola umum untuk UDF, berdasarkan kasus penggunaan di dunia nyata.
Memperkaya acara
Gunakan UDF untuk memperkaya peristiwa dengan kolom baru untuk informasi yang lebih kontekstual.
Contoh:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
Mengubah peristiwa
Gunakan UDF untuk mengubah seluruh format peristiwa, bergantung pada yang diharapkan tujuan Anda.
Contoh berikut mengembalikan entri log Cloud Logging
(LogEntry
) ke string log
asli jika tersedia. (Bergantung pada sumber log, string log asli terkadang diisi di kolom textPayload
.) Anda dapat menggunakan pola ini untuk
mengirim log mentah dalam format aslinya, bukan mengirim seluruh
LogEntry
dari Cloud Logging.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
Menyamarkan atau menghapus data peristiwa
Gunakan UDF untuk menyamarkan atau menghapus bagian peristiwa.
Contoh berikut menyamarkan nama kolom sensitiveField
dengan mengganti nilainya, dan menghapus kolom bernama redundantField
sepenuhnya.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
Peristiwa rute
Gunakan UDF untuk merutekan peristiwa ke tujuan terpisah di sink hilir.
Contoh berikut, berdasarkan template Pub/Sub ke Splunk, merutekan setiap peristiwa ke indeks Splunk yang benar. Fungsi ini memanggil fungsi lokal yang ditentukan pengguna untuk memetakan peristiwa ke indeks.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
Contoh berikutnya merutekan peristiwa yang tidak dikenali ke antrean pesan yang tidak terkirim, dengan asumsi template mendukung antrean pesan yang tidak terkirim. (Misalnya, lihat template Pub/Sub ke JDBC.) Anda dapat menggunakan pola ini untuk memfilter entri yang tidak terduga sebelum menulis ke tujuan.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
Filter peristiwa
Gunakan UDF untuk memfilter peristiwa yang tidak diinginkan atau tidak dikenali dari output.
Contoh berikut menghapus peristiwa saat data.severity
sama dengan "DEBUG"
.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
Langkah berikutnya
- Template yang disediakan Google
- Membangun dan menjalankan Template Flex
- Menjalankan template klasik
- Memperluas template Dataflow dengan UDF (postingan blog)
- Contoh UDF (GitHub)