為 Dataflow 範本建立使用者定義的函式

部分 Google 提供的 Dataflow 範本支援使用者定義函式 (UDF)。您可以使用 UDF 擴充範本的功能,不必修改範本程式碼。

總覽

如要建立 UDF,請視範本而定,編寫 JavaScript 函式或 Python 函式。將 UDF 程式碼檔案儲存在 Cloud Storage,並將位置指定為範本參數。範本會針對每個輸入元素呼叫您的函式。函式會轉換元素或執行其他自訂邏輯,然後將結果傳回範本。

舉例來說,您可以使用 UDF 執行下列操作:

  • 重新設定輸入資料格式,使其符合目標結構定義。
  • 遮蓋機密資料。
  • 從輸出內容中篩選部分元素。

UDF 函式的輸入內容是單一資料元素,並序列化為 JSON 字串。此函式會傳回序列化的 JSON 字串做為輸出內容。資料格式取決於範本。舉例來說,在「Pub/Sub 訂閱項目到 BigQuery」範本中,輸入內容是序列化為 JSON 物件的 Pub/Sub 訊息資料,輸出內容則是代表 BigQuery 資料表列的序列化 JSON 物件。詳情請參閱各範本的說明文件

使用 UDF 執行範本

如要使用 UDF 執行範本,請指定 JavaScript 檔案的 Cloud Storage 位置和函式名稱做為範本參數。

您也可以使用 Google 提供的部分範本,直接在Google Cloud 控制台中建立 UDF,方法如下:

  1. 前往 Google Cloud 控制台的 Dataflow 頁面。

    前往 Dataflow 頁面

  2. 按一下「Create job from template」(利用範本建立工作)

  3. 選取要執行的 Google 範本。

  4. 展開「Optional parameters」(選用參數)。如果範本支援 UDF,就會有 UDF 的 Cloud Storage 位置參數,以及函式名稱參數。

  5. 在範本參數旁,按一下「建立 UDF」

  6. 在「選取或建立使用者定義函式 (UDF)」面板中:

    1. 輸入檔案名稱。範例:my_udf.js
    2. 選取 Cloud Storage 資料夾。 範例:gs://your-bucket/your-folder
    3. 使用內嵌程式碼編輯器編寫函式。編輯器會預先填入範本程式碼,方便您著手開發。
    4. 按一下「建立 UDF」

      Google Cloud 控制台會儲存 UDF 檔案,並填入 Cloud Storage 位置。

    5. 在對應欄位中輸入函式名稱。

編寫 JavaScript UDF

下列程式碼顯示您可以從中開始的空值 JavaScript UDF:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

JavaScript 程式碼會在 Nashorn JavaScript 引擎上執行。建議您先在 Nashorn 引擎上測試 UDF,再進行部署。Nashorn 引擎與 Node.js 的 JavaScript 實作方式並不完全相同。常見問題是使用 console.log()Number.isNaN(),但這兩者都未在 Nashorn 引擎中定義。

您可以使用已預先安裝 JDK 11 的 Cloud Shell,在 Nashorn 引擎上測試 UDF。以互動模式啟動 Nashorn,如下所示:

jjs --language=es6

在 Nashorn 互動式殼層中,執行下列步驟:

  1. 呼叫 load 載入 UDF JavaScript 檔案。
  2. 視管道預期的訊息而定,定義輸入 JSON 物件。
  3. 使用 JSON.stringify 函式將輸入內容序列化為 JSON 字串。
  4. 呼叫 UDF 函式來處理 JSON 字串。
  5. 呼叫 JSON.parse 來還原輸出內容的序列化狀態。
  6. 確認結果。

範例:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

編寫 Python UDF

下列程式碼顯示可做為起點的無運算 Python UDF:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Python UDF 支援 Python 和 Apache Beam 的標準依附元件套件。他們無法使用第三方套件。

處理錯誤

一般來說,如果執行 UDF 時發生錯誤,系統會將錯誤寫入無效郵件位置。詳細資料視範本而定。舉例來說,「Pub/Sub 訂閱項目到 BigQuery」範本會建立 _error_records 資料表,並將錯誤寫入該處。語法錯誤或未處理的例外狀況都可能導致執行階段 UDF 錯誤。如要檢查語法錯誤,請在本機測試 UDF。

您可以針對不應處理的元素,以程式輔助方式擲回例外狀況。如果範本支援,系統會將元素寫入死信位置。如需這個方法的範例,請參閱「路由事件」。

應用實例

本節根據實際應用案例,說明 UDF 的常見模式。

豐富活動內容

使用 UDF 透過新欄位擴充事件,取得更多背景資訊。

範例:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

轉換事件

視目的地預期格式而定,使用 UDF 轉換整個事件格式。

以下範例會將 Cloud Logging 記錄檔項目 (LogEntry) 還原為原始記錄字串 (如有)。(視記錄來源而定,原始記錄字串有時會填入 textPayload 欄位。)您可能會使用這個模式,以原始格式傳送原始記錄,而不是從 Cloud Logging 傳送整個 LogEntry

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

遮蓋或移除事件資料

使用 UDF 遮蓋或移除事件的一部分。

以下範例會透過取代欄位名稱 sensitiveField 的值來遮蓋該欄位,並完全移除名為 redundantField 的欄位。

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

路徑事件

使用 UDF 將事件傳送至下游接收器中的不同目的地。

以下範例是以「將 Pub/Sub 的內容傳輸到 Splunk」範本為基礎,將每個事件傳輸到正確的 Splunk 索引。這個函式會呼叫使用者定義的本機函式,將事件對應至索引。

function process(inJson) {
  const obj = JSON.parse(inJson);
  
  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}  

下一個範例會將無法辨識的事件傳送至死信佇列,前提是範本支援死信佇列。(例如,請參閱「Pub/Sub to JDBC」範本)。您可能會使用這個模式,在寫入目的地之前,篩除非預期的項目。

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

篩選事件

使用 UDF 從輸出內容中篩除不想要的或無法辨識的事件。

以下範例會捨棄 data.severity 等於 "DEBUG" 的事件。

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

後續步驟