使用管道記錄

您可以使用 Apache Beam SDK 的內建記錄基礎架構,在執行管道時記錄資訊。您可以使用 Google Cloud 主控台,在管道執行期間和之後監控記錄資訊。

將記錄訊息加入管道中

Java

Java 適用的 Apache Beam SDK 建議您透過開放原始碼的 Simple Logging Facade for Java (SLF4J) 程式庫記錄工作站訊息。 Java 適用的 Apache Beam SDK 會實作必要的記錄基礎架構,因此 Java 程式碼僅須匯入 SLF4J API,接著將記錄器實例化,就可在管道程式碼中啟用訊息記錄功能。

如果是預先存在的程式碼和/或程式庫,Java 適用的 Apache Beam SDK 會設定額外的記錄基礎架構。系統會擷取下列 Java 記錄程式庫產生的記錄訊息:

Python

Python 適用的 Apache Beam SDK 提供 logging 程式庫套件,可讓管道工作站輸出記錄訊息。如要使用程式庫函式,您必須匯入程式庫:

import logging

Go

Go 適用的 Apache Beam SDK 提供 log 程式庫套件,可讓管道工作站輸出記錄訊息。如要使用程式庫函式,您必須匯入程式庫:

import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"

工作站記錄訊息程式碼示例

Java

下列範例使用 SLF4J 進行 Dataflow 記錄。 如要進一步瞭解如何設定 SLF4J 以進行 Dataflow 記錄,請參閱「Java 提示」一文。

您可以修改 Apache Beam WordCount 示例,讓系統在處理完成的文字中發現「love」這個字詞時,輸出記錄訊息。在下方範例中,以粗體表示的文字為新增的程式碼。我們一併加入了前後方的程式碼,以提供完整資訊。

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

您可以修改 Apache Beam wordcount.py 範例,讓系統在處理完成的文字中發現「love」這個字詞時,輸出記錄訊息。

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Go

您可以修改 Apache Beam wordcount.go 範例,讓系統在處理完成的文字中發現「love」這個字詞時,輸出記錄訊息。

func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        // increment the counter for small words if length of words is
        // less than small_word_length
        if strings.ToLower(word) == "love" {
            log.Infof(ctx, "Found : %s", strings.ToLower(word))
        }

        emit(word)
    }
}

// Remaining Wordcount example

Java

如果經過修改的 WordCount 管道使用預設的「DirectRunner」在本機執行,且輸出結果會傳送至本機檔案 (--output=./local-wordcounts),則主控台輸出結果會包含下列新增的記錄訊息:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

根據預設,只有標示 INFO 以上層級的記錄行會傳送至 Cloud Logging。如要變更這項行為,請參閱「設定管道工作站記錄層級」。

Python

如果經過修改的 WordCount 管道使用預設的「DirectRunner」在本機執行,且輸出結果會傳送至本機檔案 (--output=./local-wordcounts),則主控台輸出結果會包含下列新增的記錄訊息:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

根據預設,只有標示 INFO 以上層級的記錄行會傳送至 Cloud Logging。如要變更這項行為,請參閱「設定管道工作站記錄層級」。

請勿使用 logging.config 函式覆寫記錄設定,否則可能會停用預先設定的記錄處理常式,導致管道記錄無法傳輸至 Dataflow 和 Cloud Logging。

Go

如果經過修改的 WordCount 管道使用預設的「DirectRunner」在本機執行,且輸出結果會傳送至本機檔案 (--output=./local-wordcounts),則主控台輸出結果會包含下列新增的記錄訊息:

2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love

根據預設,只有標示 INFO 以上層級的記錄行會傳送至 Cloud Logging。

控制記錄檔量

您也可以變更管線的記錄層級,減少產生的記錄檔量。如要停止擷取部分或所有 Dataflow 記錄,請新增 Logging 排除項目,排除 Dataflow 記錄。然後將記錄檔匯出至其他目的地,例如 BigQuery、Cloud Storage 或 Pub/Sub。詳情請參閱控管 Dataflow 記錄的擷取作業

記錄限制和節流

每個工作站每 30 秒最多只能傳送 15,000 則工作站記錄訊息。 如果達到這項限制,系統會新增單一工作站記錄訊息,指出記錄受到節流:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
在 30 秒間隔結束前,系統不會再記錄任何訊息。 Apache Beam SDK 和使用者程式碼產生的記錄訊息共用此限制。

儲存及保留記錄檔

作業記錄會儲存在 _Default 記錄檔值區中。記錄 API 服務名稱為 dataflow.googleapis.com。如要進一步瞭解 Cloud Logging 中使用的 Google Cloud 受控資源類型和服務,請參閱「受控資源和服務」。

如要進一步瞭解 Logging 保留記錄項目的時間長度,請參閱「配額與限制:記錄保留期限」中的保留說明。

如要瞭解如何查看作業記錄,請參閱「監控及查看管道記錄」。

監控及查看管道記錄

Dataflow 服務上執行管道時,您可以使用 Dataflow 監控介面查看管道發出的記錄。

Dataflow 工作站記錄範例

您可以透過以下方式在雲端執行經過修改的 WordCount 管道:

Java

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Go

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

查看記錄

由於 WordCount 雲端管道使用的是封鎖執行作業,因此主控台訊息是在管道執行期間輸出的。工作開始執行後,系統會將Google Cloud 主控台頁面的連結輸出到主控台,後接管道工作 ID:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

主控台網址會連結至 Dataflow 監控介面的已提交工作摘要頁面。頁面左側是動態執行圖表,右側是摘要資訊。按一下底部面板上的 ,展開記錄面板。

記錄面板預設會顯示回報整體工作狀態的「Job Logs」(工作記錄)。如要篩選記錄面板中顯示的訊息,請依序點選「資訊」「篩選記錄」

選取圖表中的管道步驟可查看程式碼產生的「Step Logs」(步驟記錄),以及在該管道步驟中執行的所產生程式碼。

如要返回「Job Logs」(工作記錄),請按一下圖表以外的地方,或使用右側面板中的「取消選取步驟」按鈕來清除步驟。

前往記錄檔探索工具

如要開啟記錄檔探索工具並選取不同記錄類型,請在記錄面板中按一下「在記錄檔探索工具中查看」 (外部連結按鈕)。

在記錄檔探索工具中,如要查看不同記錄類型的面板,請點選「記錄欄位」切換按鈕。

在「Logs Explorer」頁面中,查詢可能會依工作步驟或記錄類型篩選記錄。如要移除篩選條件,請按一下「顯示查詢」切換鈕,然後編輯查詢。

如要查看作業的所有可用記錄,請按照下列步驟操作:

  1. 在「Query」(查詢) 欄位中,輸入下列查詢:

    resource.type="dataflow_step"
    resource.labels.job_id="JOB_ID"
    

    JOB_ID 替換為工作 ID。

  2. 點選「執行查詢」

  3. 如果使用這項查詢後,沒有看到作業的記錄,請按一下「編輯時間」

  4. 調整開始時間和結束時間,然後按一下「套用」

記錄類型

記錄檔瀏覽器也提供管道的基礎架構記錄。使用錯誤和警告記錄,診斷觀察到的管道問題。基礎架構記錄中的錯誤和警告若與管道問題無關,不一定表示有問題。

以下摘要說明可在「記錄探索器」頁面中查看的各種記錄類型:

  • job-message 記錄包含 Dataflow 各種元件產生的工作等級訊息,例如自動調度資源設定、工作站或關閉時間、工作步驟進度以及工作錯誤。源於當機使用者程式碼的工作站等級錯誤以及存在於 worker 記錄裡的工作站等級錯誤,也會一併傳播至 job-message 記錄。
  • worker 記錄由 Dataflow 工作站產生。工作站完成大部分的管道作業工作 (例如將 ParDo 套用至資料)。 worker 記錄包含由您的程式碼和 Dataflow 記錄的訊息。
  • worker-startup 記錄存在於大部分的 Dataflow 工作中,可擷取與啟動程序相關的訊息。啟動程序包括從 Cloud Storage 下載工作的 Jar,接著啟動工作站。如果啟動工作站時發生問題,可以查看這類記錄。
  • harness 記錄包含來自 Runner v2 執行器架構的訊息。
  • shuffler 記錄包含合併平行管道作業結果的工作站訊息。
  • system 記錄包含工作站 VM 主機作業系統的訊息。在某些情況下,這些記錄可能會擷取程序當機或記憶體不足 (OOM) 事件。
  • dockerkubelet 記錄包含與 Dataflow 工作站使用的公開技術相關的訊息。
  • nvidia-mps 記錄檔包含有關 NVIDIA 多程序服務 (MPS) 作業的訊息。

設定管道工作站記錄層級

Java

Java 適用的 Apache Beam SDK 針對工作站所設定的預設 SLF4J 記錄層級為 INFO。因此會發出 INFO 或更高層級 (INFOWARNERROR) 的所有記錄訊息。您可以設定其他預設記錄層級,以支援較低的 SLF4J 記錄層級 (TRACEDEBUG),或針對程式碼中的各種類別套件設定不同的記錄層級。

以下管道選項可讓您透過指令列或程式來設定工作站記錄層級:

  • --defaultSdkHarnessLogLevel=<level>:使用這個選項將所有記錄器設為指定預設層級。舉例來說,以下指令列選項將覆寫預設的 Dataflow INFO 記錄層級,並將前述記錄層級設為 DEBUG:
    --defaultSdkHarnessLogLevel=DEBUG
  • --sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}:使用此選項為指定的套件或類別設定記錄等級。舉例來說,如要覆寫 org.apache.beam.runners.dataflow 套件的預設管道記錄等級並將其設定為TRACE:
    --sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
    如要進行多項覆寫,請提供 JSON 對應:
    (--sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})。
  • 如果管道使用 Apache Beam SDK 2.50.0 以下版本,且未採用 Runner v2,則不支援 defaultSdkHarnessLogLevelsdkHarnessLogLevelOverrides 管道選項。在這種情況下,請使用 --defaultWorkerLogLevel=<level>--workerLogLevelOverrides={"<package or class>":"<level>"} 管道選項。如要進行多重覆寫,請提供 JSON 對應:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})

以下範例運用可以指令列覆寫的預設值,透過程式來設定管道記錄選項:

 PipelineOptions options = ...
 SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE);
 // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher.
 loggingOptions.getSdkHarnessLogLevelOverrides()
     .addOverrideForClass(Foo.class, LogLevel.WARN)
     .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);

Python

Python 適用的 Apache Beam SDK 針對工作站所設定的預設記錄層級為 INFO。因此會發出 INFO 或更高層級 (INFOWARNINGERRORCRITICAL) 的所有記錄訊息。您可以設定其他預設記錄層級,以支援較低的記錄層級 (DEBUG),或針對程式碼中的不同模組設定不同的記錄層級。

以下兩種管道選項可讓您透過指令列或程式來設定工作站記錄層級:

  • --default_sdk_harness_log_level=<level>:使用這個選項將所有記錄器設為指定預設層級。舉例來說,以下指令列選項會覆寫預設的 Dataflow INFO 記錄層級,並將前述記錄層級設為 DEBUG
    --default_sdk_harness_log_level=DEBUG
  • --sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}:使用此選項為指定模組設定記錄等級。舉例來說,如要覆寫 apache_beam.runners.dataflow 模組的預設管道記錄等級並將其設定為DEBUG:
    --sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
    如要進行多個覆寫作業,請提供 JSON 對應:
    (--sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...})。

以下範例使用 WorkerOptions 類別,以程式輔助方式設定管道記錄選項,這些選項可透過指令列覆寫:

  from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions

  pipeline_args = [
    '--project=PROJECT_NAME',
    '--job_name=JOB_NAME',
    '--staging_location=gs://STORAGE_BUCKET/staging/',
    '--temp_location=gs://STORAGE_BUCKET/tmp/',
    '--region=DATAFLOW_REGION',
    '--runner=DataflowRunner'
  ]

  pipeline_options = PipelineOptions(pipeline_args)
  worker_options = pipeline_options.view_as(WorkerOptions)
  worker_options.default_sdk_harness_log_level = 'WARNING'

  # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}']
  worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"}

  # Pass in pipeline options during pipeline creation.
  with beam.Pipeline(options=pipeline_options) as pipeline:

更改下列內容:

  • PROJECT_NAME:專案名稱
  • JOB_NAME:工作名稱
  • STORAGE_BUCKET:Cloud Storage 名稱
  • DATAFLOW_REGION:要部署 Dataflow 工作的區域

    --region 標記會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設地區。

Go

Go 適用的 Apache Beam SDK 不支援這項功能。

查看已啟動 BigQuery 工作的記錄

在 Dataflow 管道中使用 BigQuery 時,系統會啟動 BigQuery 作業,代表您執行各種動作。這些動作可能包括載入資料、匯出資料和其他類似工作。為進行疑難排解和監控,Dataflow 監控介面的「記錄」面板會提供這些 BigQuery 工作的額外資訊。

「記錄」面板中顯示的 BigQuery 工作資訊,是從 BigQuery 系統資料表儲存及載入。查詢基礎 BigQuery 資料表時,會產生帳單費用

查看 BigQuery 工作詳細資料

如要查看 BigQuery 工作資訊,管道必須使用 Apache Beam 2.24.0 以上版本。

如要列出 BigQuery 工作,請開啟「BigQuery Jobs」(BigQuery 工作) 分頁,然後選取 BigQuery 工作的位置。接著,按一下「Load BigQuery Jobs」(載入 BigQuery 工作),然後確認對話方塊。查詢完成後,系統會顯示工作清單。

BigQuery 工作資訊表中的「載入 BigQuery 工作」按鈕

系統會提供各項工作的基本資訊,包括工作 ID、類型、時間長度和其他詳細資料。

這個表格會顯示在目前管道工作執行期間執行的 BigQuery 工作。

如要查看特定工作的詳細資訊,請按一下「詳細資訊」欄中的「指令列」

在指令列的模式視窗中,複製 bq jobs describe 指令,然後在本機或 Cloud Shell 中執行。

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

bq jobs describe 指令會輸出 JobStatistics,提供更多詳細資料,有助於診斷緩慢或停滯的 BigQuery 工作。

或者,您也可以搭配 SQL 查詢使用 BigQueryIO,系統會發出查詢作業。如要查看作業使用的 SQL 查詢,請按一下「詳細資訊」欄中的「查看查詢」

查看診斷資訊

「記錄」窗格的「診斷」分頁會收集並顯示管道中產生的特定記錄檔項目。這些項目包括指出管道可能發生問題的訊息,以及含有堆疊追蹤的錯誤訊息。系統會將收集到的記錄項目去重複,並合併為錯誤群組

「診斷」分頁,顯示含有「服務錯誤」錯誤群組的 Dataflow 工作。

錯誤報告包含下列資訊:

  • 錯誤清單,其中提供錯誤訊息
  • 每個錯誤發生的次數
  • 顯示每個錯誤發生時間的直方圖
  • 錯誤最近一次發生的時間
  • 錯誤首次發生的時間
  • 錯誤狀態

如要查看特定錯誤的錯誤報告,請按一下「錯誤」欄下方的說明。系統隨即會顯示「錯誤報告」頁面。 如果錯誤是服務錯誤,系統會顯示「疑難排解指南」連結。

Dataflow 服務錯誤的錯誤群組詳細資料頁面。

如要進一步瞭解這個頁面,請參閱「查看及篩選錯誤」。

略過錯誤

如要將錯誤訊息設為靜音,請按照下列步驟操作:

  1. 開啟「診斷」分頁。
  2. 按一下要靜音的錯誤。
  3. 開啟解決狀態選單。狀態標籤如下:「開啟」、「已確認」、「已解決」或「已靜音」
  4. 選取「已靜音」