部分 Google 提供的 Dataflow 範本支援使用者定義函式 (UDF)。您可以使用 UDF 擴充範本的功能,不必修改範本程式碼。
總覽
如要建立 UDF,請視範本而定,編寫 JavaScript 函式或 Python 函式。將 UDF 程式碼檔案儲存在 Cloud Storage,並將位置指定為範本參數。範本會針對每個輸入元素呼叫您的函式。函式會轉換元素或執行其他自訂邏輯,然後將結果傳回範本。
舉例來說,您可以使用 UDF 執行下列操作:
- 重新設定輸入資料格式,使其符合目標結構定義。
- 遮蓋機密資料。
- 從輸出內容中篩選部分元素。
UDF 函式的輸入內容是單一資料元素,並序列化為 JSON 字串。此函式會傳回序列化的 JSON 字串做為輸出內容。資料格式取決於範本。舉例來說,在「Pub/Sub 訂閱項目到 BigQuery」範本中,輸入內容是序列化為 JSON 物件的 Pub/Sub 訊息資料,輸出內容則是代表 BigQuery 資料表列的序列化 JSON 物件。詳情請參閱各範本的說明文件。
使用 UDF 執行範本
如要使用 UDF 執行範本,請指定 JavaScript 檔案的 Cloud Storage 位置和函式名稱做為範本參數。
您也可以使用 Google 提供的部分範本,直接在Google Cloud 控制台中建立 UDF,方法如下:
前往 Google Cloud 控制台的 Dataflow 頁面。
按一下「Create job from template」(利用範本建立工作)add_box。
選取要執行的 Google 範本。
展開「Optional parameters」(選用參數)。如果範本支援 UDF,就會有 UDF 的 Cloud Storage 位置參數,以及函式名稱參數。
在範本參數旁,按一下「建立 UDF」。
在「選取或建立使用者定義函式 (UDF)」面板中:
- 輸入檔案名稱。範例:
my_udf.js
。 - 選取 Cloud Storage 資料夾。
範例:
gs://your-bucket/your-folder
。 - 使用內嵌程式碼編輯器編寫函式。編輯器會預先填入範本程式碼,方便您著手開發。
按一下「建立 UDF」。
Google Cloud 控制台會儲存 UDF 檔案,並填入 Cloud Storage 位置。
在對應欄位中輸入函式名稱。
- 輸入檔案名稱。範例:
編寫 JavaScript UDF
下列程式碼顯示您可以從中開始的空值 JavaScript UDF:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
JavaScript 程式碼會在 Nashorn JavaScript 引擎上執行。建議您先在 Nashorn 引擎上測試 UDF,再進行部署。Nashorn 引擎與 Node.js 的 JavaScript 實作方式並不完全相同。常見問題是使用 console.log()
或 Number.isNaN()
,但這兩者都未在 Nashorn 引擎中定義。
您可以使用已預先安裝 JDK 11 的 Cloud Shell,在 Nashorn 引擎上測試 UDF。以互動模式啟動 Nashorn,如下所示:
jjs --language=es6
在 Nashorn 互動式殼層中,執行下列步驟:
- 呼叫
load
載入 UDF JavaScript 檔案。 - 視管道預期的訊息而定,定義輸入 JSON 物件。
- 使用
JSON.stringify
函式將輸入內容序列化為 JSON 字串。 - 呼叫 UDF 函式來處理 JSON 字串。
- 呼叫
JSON.parse
來還原輸出內容的序列化狀態。 - 確認結果。
範例:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
編寫 Python UDF
下列程式碼顯示可做為起點的無運算 Python UDF:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
Python UDF 支援 Python 和 Apache Beam 的標準依附元件套件。他們無法使用第三方套件。
處理錯誤
一般來說,如果執行 UDF 時發生錯誤,系統會將錯誤寫入無效郵件位置。詳細資料視範本而定。舉例來說,「Pub/Sub 訂閱項目到 BigQuery」範本會建立 _error_records
資料表,並將錯誤寫入該處。語法錯誤或未處理的例外狀況都可能導致執行階段 UDF 錯誤。如要檢查語法錯誤,請在本機測試 UDF。
您可以針對不應處理的元素,以程式輔助方式擲回例外狀況。如果範本支援,系統會將元素寫入死信位置。如需這個方法的範例,請參閱「路由事件」。
應用實例
本節根據實際應用案例,說明 UDF 的常見模式。
豐富活動內容
使用 UDF 透過新欄位擴充事件,取得更多背景資訊。
範例:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
轉換事件
視目的地預期格式而定,使用 UDF 轉換整個事件格式。
以下範例會將 Cloud Logging 記錄檔項目 (LogEntry
) 還原為原始記錄字串 (如有)。(視記錄來源而定,原始記錄字串有時會填入 textPayload
欄位。)您可能會使用這個模式,以原始格式傳送原始記錄,而不是從 Cloud Logging 傳送整個 LogEntry
。
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
遮蓋或移除事件資料
使用 UDF 遮蓋或移除事件的一部分。
以下範例會透過取代欄位名稱 sensitiveField
的值來遮蓋該欄位,並完全移除名為 redundantField
的欄位。
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
路徑事件
使用 UDF 將事件傳送至下游接收器中的不同目的地。
以下範例是以「將 Pub/Sub 的內容傳輸到 Splunk」範本為基礎,將每個事件傳輸到正確的 Splunk 索引。這個函式會呼叫使用者定義的本機函式,將事件對應至索引。
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
下一個範例會將無法辨識的事件傳送至死信佇列,前提是範本支援死信佇列。(例如,請參閱「Pub/Sub to JDBC」範本)。您可能會使用這個模式,在寫入目的地之前,篩除非預期的項目。
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
篩選事件
使用 UDF 從輸出內容中篩除不想要的或無法辨識的事件。
以下範例會捨棄 data.severity
等於 "DEBUG"
的事件。
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
後續步驟
- Google 提供的範本
- 建構及執行 Flex 範本
- 執行傳統範本
- 使用 UDF 擴充 Dataflow 範本 (網誌文章)
- UDF 範例 (GitHub)