高度平行工作流程的最佳做法

本頁面提供指南,說明建構及執行 Dataflow HPC 高度平行工作流程時應遵循的最佳做法,包括如何在管道中使用外部程式碼、如何執行管道,以及如何管理錯誤處理。

在管道中加入外部程式碼

高平行管道的主要差異在於,這類管道使用 DoFn 內的 C++ 程式碼,而非標準 Apache Beam SDK 語言。如果是 Java 管道,建議您使用外部程序呼叫,以便在管道中輕鬆使用 C++ 程式庫。本節說明在 Java 管道中執行外部 (C++) 程式碼的一般方式。

Apache Beam pipeline 定義包含幾個重要元件:

  • PCollections 是同質元素的不可變動集合。
  • PTransforms 用於定義 PCollection 的轉換,以產生另一個 PCollection
  • 管道是透過程式碼宣告 PTransformsPCollections 之間互動的建構函式。管道會以有向非循環圖 (DAG) 表示,

如果您使用的程式碼語言並非標準 Apache Beam SDK 語言,請將程式碼放在 DoFn 內的 PTransform 中,並使用其中一種標準 SDK 語言定義管道本身。建議您使用 Apache Beam Python SDK 定義管道,因為 Python SDK 具有實用程式類別,可簡化其他程式碼的使用方式。不過,您可以使用其他 Apache Beam SDK。

您可以先使用上述程式碼立即進行實驗,不需從無到有建構程式。在實際運作系統中,通常會建立自己開發的二進位檔,讓您可以視自己的需要自由調整處理邏輯。

下列圖表說明了管道資料的兩種用法:

  • 資料用以驅動程序。
  • 在程序進行的過程以及與驅動程式資料彙整時取得資料。

管線資料的兩個階段

在本頁面中,主要資料 (來自來源) 稱為「驅動資料」,次要資料 (來自處理階段) 稱為「彙整資料」

以金融用途來說,驅動資料可能會是幾十萬筆的交易記錄。每筆交易都要與市場資料一起處理。在這種情況下,市場資料就是彙整資料。若是媒體用途,驅動資料可能是需要處理的圖片檔,但不需其他資料來源,這樣就不須使用彙整資料。

驅動資料大小的考慮因素

如果驅動資料元件大小落在低 MB 量的範圍內,則應以正常的 Apache Beam 案例來處理,從來源資料建立一個 PCollection 物件,並將此物件交給 Apache Beam 進行轉換,以供程序使用。

如果驅動資料元件大小落在高 MB 量或甚至 GB 量級,通常是媒體類的資料,您可將此驅動資料存入 Cloud Storage。接著,在起始 PCollection 物件中,參照儲存空間的 URI,並只使用該資料的 URI 參照。

彙整資料大小的考慮因素

如果彙整資料只有數百 MB 或更少,使用側向輸入將此資料傳送給 Apache Beam 進行轉換。側向輸入會將資料封包傳送給每個有需要的工作站。

如果彙整資料在 GB 或 TB 範圍內,請依資料的性質來決定使用 Bigtable 或 Cloud Storage,將彙整資料合併到驅動資料中。Bigtable 適用於金融類的資料,其中市場資料通常都以 Bigtable 查詢結果的鍵值對應來存取。如要進一步瞭解如何設計 Bigtable 結構定義,包括處理時間序列資料的建議,請參閱下列 Bigtable 說明文件:

執行外部程式碼

您可以使用多種方式在 Apache Beam 中執行外部程式碼。

  • 在 Dataflow 轉換中,從 DoFn 物件呼叫程序。

  • 搭配 Java SDK 使用 JNI

  • 直接從 DoFn 物件建立子程序。雖然這不是最有效率的做法,卻是一個穩定且易於維護的方法。由於使用 JNI 可能導致潛在問題,因此本頁面會示範如何使用子程序呼叫。

設計工作流程時,請以完整的端對端管道作思考。從來源到接收器的整個資料移動作業都由單一管道完成,因此在執行程序的過程中,不會出現效能低落的狀況。若要與其他方法相比較,別忘了比對端對端所花的時間與耗費的成本。

將二進位檔提取到主機中

使用原生 Apache Beam 語言時,Apache Beam SDK 會自動將所有必要程式碼移至工作站。但若呼叫的是外部程式碼,則必須手動移動程式碼。

存在值區中的二進位檔案

如要移動程式碼,請執行以下操作。這個範例會示範 Apache Beam Java SDK 的步驟。

  1. 將已編譯的外部程式碼以及版本訊息儲存在 Cloud Storage 中。
  2. @Setup 方法中,新增一個同步區塊,以檢查本機資源中是否有可用的程式碼檔案。第一個執行緒結束時,您可以使用一個靜態變數來確認可用性,而非實作實際檢查。
  3. 若無法使用檔案,透過 Cloud Storage 用戶端程式庫將檔案從 Cloud Storage 值區提取到本機工作站。建議採取的作法是使用 Apache Beam FileSystems 類別來執行此工作。
  4. 檔案移動後,確認程式碼檔案上的執行位元已被設定。
  5. 在實際運作系統中,檢查二進位檔的雜湊碼,確保已正確複製檔案。

使用 Apache Beam filesToStage 函式也是一個方法,但此方法會失去執行器自動移動跟封裝 Java 程式碼的一些優點。此外,子程序的呼叫需要絕對檔案路徑,您必須透過程式碼來決定類別路徑,以及 filesToStage 所移動檔案的位置。我們不建議使用這個方法。

執行外部二進位檔

在您執行外部程式碼前,您必須先為其建構一個包裝函式。包裝函式可使用與外部程式碼相同的語言 (例如 C++) 或是 shell 指令碼來撰寫。包裝函式可讓您傳送檔案控點及實作最佳化,如本頁面「為短週期 CPU 設計程序」一節所述。包裝函式不用太複雜,底下的程式碼片段概略呈現了用 C++ 所寫的包裝函式。

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

此程式碼從傳入引數清單中讀取兩個參數值。第一個參數是資料推送到的傳回檔案位置。第二個參數是程式碼會回應給使用者的資料。在實作中,這段程式碼通常會比「Hello, world」回應要來的更有內容一點!

完成包裝函式後,依下列步驟執行外部程式碼:

  1. 將資料傳送給外部程式碼的二進位檔。
  2. 執行二進位檔,擷取任何可能會發生的錯誤,並將錯誤訊息及結果記錄下來。
  3. 處理登入資訊。
  4. 取得處理完成後得到的資料。

傳送資料給二進位檔

如要開始執行程式庫的程序,請將資料傳送給 C++ 程式碼。 在這個步驟中,您可以利用 Dataflow 整合其他 Google Cloud 工具所帶來的好處,像 Bigtable 這類工具可以處理非常龐大的資料集,並處理高同步低延遲的存取,允許上千個核心同時存取資料集。此外,Bigtable 可預先處理資料,允許資料成形、充實資料以及過濾。在您執行外部程式碼前,這些工作都可在 Apache Beam 轉換中完成。

在實際運作系統上,建議使用協議緩衝區來作為封裝輸入資料的路徑。輸入資料在傳送到外部程式庫之前,可先轉換為位元組以及 base64 的編碼處理。有兩種方式可以傳送資料到外部程式庫:

  • 小型輸入資料:對於還未超過指令引數最大長度的小型資料,將引數傳送到 java.lang.ProcessBuilder
  • 大型輸入資料:對於大型資料,新增一個檔名含有 UUID 的檔案,以裝載程序所需要的資料。

執行 C++ 程式碼、擷取錯誤與記錄

擷取及處理錯誤訊息是管道非常重要的一部分。Dataflow 執行器使用的資源為臨時資源,而且通常很難檢查工作站記錄檔。您必須確保擷取所有必要的資訊,推送到 Dataflow 執行器的記錄中,並將記錄下來的資料儲存在一個或多個 Cloud Storage 值區裡。

建議的作法是將 stdoutstderr 重新導向到檔案,可避免記憶體不足的疑慮。舉例來說,在執行 C++ 程式碼的 Dataflow 執行器中,您可以加入以下的設定:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

處理記錄資訊

有許多用途都會處理上百萬個元素。程序執行成功通常只會產生少量記錄,或甚至不會產生記錄,因此您必須做出關於保留記錄資料的業務決策。舉例來說,除了單純記下所有資訊,也可考慮這些替代方案:

  • 如果記錄中來自執行成功的訊息沒有價值,那就不必保留。
  • 建立對記錄資料的取樣機制,比如每一萬筆記錄項目時取樣一次。如果執行的程序是同性質的 (比方說,程式碼中有很多遞迴產生出的記錄資料本質是相同的),這方法可在保留記錄資料及最佳化執行程序之間取得有效的平衡。

程序執行失敗時,就有可能會有大量的資料送入記錄中。可以處理大量的記錄錯誤訊息的有效策略,是讀取記錄項目的前幾行訊息,然後將這幾行推送至 Cloud Logging,系統就會將剩下的部分載入到 Cloud Storage 值區裡。這樣您就可以先看記錄檔中的前幾行錯誤訊息,有需要時再到 Cloud Storage 深入瞭解整個檔的內容。

檢查記錄檔的大小也很有用。如果檔案大小為零,那麼您可以毫無顧慮的忽略它,或是簡單地記下記錄檔沒有記錄。

從處理完成的程序取得資料

不建議使用 stdout 將運算結果回傳到 DoFn 函式。您的 C++ 程式碼會呼叫的其他程式碼,甚至包含您自己的程式碼,都有可能會對 stdout 發送訊息,進而汙染了含有記錄資料的 stdoutput 串流。您可以調整一下 C++ 的包裝函式,允許程式碼接收一個參數,指出在何處建立用於儲存數值的檔案。理想情形下,這個檔案應以各語言通用的方式存放在協議緩衝區裡,且允許 C++ 回傳物件給 Java 或 Python 程式碼,DoFn 物件可直接從檔案讀取結果,並將結果資訊傳送給其所屬的 output 呼叫。

經驗顯示對程序本身進行單元測試很重要。實作一個不須依靠 Dataflow 管道就能執行程序的單元測試是很重要的。如果程式庫的除錯能獨立運作且不需執行整個管道的話,會非常有效率。

為短 CPU 週期設計程序

呼叫子程序會造成額外負擔。視您的工作負載而定,您可能會需要多費點功夫,設法降低完成工作與開關程序的管理負擔之間的比例。

以媒體用途來說,驅動資料元件大小可能落在高 MB 量或甚至 GB 量級。因此,處理每個資料元素可能會需要不少時間。在這種情況下,與整個程序執行耗費的時間相比,呼叫子程序的成本幾乎可忽視。這種情況最好的處理方式,就是讓單一元素啟動自己所屬的程序。

但在別種用途時 (比如金融類),執行程序可能只需幾個非常短暫的 CPU 時間單位 (幾十毫秒)。這種情形下,呼叫子程序的負擔就不成比例的高了。解決方法是利用 Apache Beam 的 GroupByKey 轉換工具建立 50 至 100 個元素的批次,再傳送到程序裡。舉例來說,您可以按照下列步驟操作:

  • DoFn 函式中建立一個鍵/值組合。如果您處理的是金融類的交易資訊,可以用交易序號作為鍵值。如果您沒有專屬編號可作為鍵值,您可以先產生資料的校驗值,再使用模組函式分出 50 個元素。
  • 將鍵傳送至 GroupByKey.create 函式,該函式會傳回 KV<key,Iterable<data>> 集合,其中包含 50 個元素,您接著可以將這些元素傳送至程序。

限制工作站的平行處理量

若您使用的是 Dataflow 執行器可原生支援的程式碼語言,您可以放心讓工作站執行作業。Dataflow 有很多可以監看流程跟執行緒的程序,在批次或串流模式下皆適用。

但若您用的是 C++ 這類外部語言的話,則請注意,您將啟動子程序以進行一些特殊程序。在批次模式下,與串流模式相比,Dataflow 執行器對 CPU 使用了較少量的執行緒,所以建議您在類別中建立信號,尤其是在串流模式下,以利直接控制個別工作站的並行量。

舉例來說,在處理媒體檔時,您不會想看到上百個轉碼元素在單一工作站裡並行處理。在像這樣的案例中,您可以建立一個工具類別,為執行中的工作向 DoFn 函式提出認可。您可透過這個類別,直接控制在管道中的工作站執行緒。

在 Google Cloud使用高容量的資料接收器

資料處理完成後會傳送到資料接收器,此接收器必須能夠處理您的網格運算方案所產生的資料量。

下圖顯示當 Dataflow 執行網格工作負載時,可在 Google Cloud 中使用的接收器。

 Google Cloud可用的接收器

Bigtable、BigQuery 和 Pub/Sub 皆可處理非常大量的串流資料。比如,每個 Bigtable 節點每秒可以處理 10,000 筆,大小最大到 1K 的資料寫入量,同時還能輕鬆進行橫向擴充,因此,100 個節點組成的 Bigtable 叢集,每秒可接收 1,000,000 個由 Dataflow 網格所產生的訊息。

管理區段錯誤

在管道中使用 C++ 程式碼時,您需要決定如何管理區段錯誤,因為如果處理不當,這些錯誤會造成非本機的影響。Dataflow 執行器會視需要在 Java、Python 或 Go 中建立程序,然後以套裝組合的形式將工作指派給程序。

如果使用緊密耦合的工具 (例如 JNI 或 Cython) 呼叫 C++ 程式碼,且 C++ 程序發生區段錯誤,呼叫程序和 Java 虛擬機器 (JVM) 也會當機。在這種情況下,系統無法偵測到不良資料點。如要擷取不良資料點,請使用較鬆散的耦合,將不良資料分支出去,讓管道繼續運作。不過,如果 C++ 程式碼已臻成熟,且已針對所有資料變化完成全面測試,您可以使用 Cython 等機制。

後續步驟