Apache Flink

Apache Flink を統合すると、クライアント、JobManager、TaskManager のログが収集され、JSON ペイロードが生成されます。結果には、source、level、message のフィールドが含まれます。

Flink の詳細については、Apache Flink のドキュメントをご覧ください。

前提条件

Flink テレメトリーを収集するには、Ops エージェントをインストールする必要があります。

  • 指標の場合は、バージョン 2.18.1 以降をインストールします。
  • ログの場合は、バージョン 2.17.0 以降をインストールします。

このインテグレーションは Flink バージョン 1.12.5、1.13.6、1.14.4 をサポートします。

Flink 用に Ops エージェントを構成する

Ops エージェントの構成のガイドに従って、Flink インスタンスからテレメトリーを収集するために必要な要素を追加し、エージェントを再起動します。

構成の例

次のコマンドは、Flink のテレメトリーを収集して取り込み、Ops エージェントを再起動するための構成を作成します。

# Configures Ops Agent to collect telemetry from the app and restart Ops Agent.

set -e

# Create a back up of the existing file so existing configurations are not lost.
sudo cp /etc/google-cloud-ops-agent/config.yaml /etc/google-cloud-ops-agent/config.yaml.bak

# Configure the Ops Agent.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
logging:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
EOF

sudo service google-cloud-ops-agent restart
sleep 30

Flink からログを取り込むには、Flink が生成するログのレシーバーを作成してから、新しいレシーバー用のパイプラインを作成する必要があります。

flink ログのレシーバーを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
exclude_paths include_paths の照合で除外するファイルシステム パスのパターンのリスト。
include_paths [/opt/flink/log/flink-*-standalonesession-*.log, /opt/flink/log/flink-*-taskexecutor-*.log, /opt/flink/log/flink-*-client-*.log] 各ファイルのテーリングで読み込むファイルシステムのパスのリスト。パスにはワイルドカード(*)を使用できます。
record_log_file_path false true に設定すると、ログレコードの取得元のファイルのパスが agent.googleapis.com/log_file_path ラベルの値として出力ログエントリに表示されます。ワイルドカードを使用する場合、レコードを取得したファイルのパスのみが記録されます。
type この値は、flink にする必要があります。
wildcard_refresh_interval 60s include_paths のワイルドカード ファイルのパスの更新間隔。期間を指定します(例: 30s2m)。このプロパティは、ログファイルのローテーションがデフォルトの間隔よりも速く、ロギングのスループットが高い場合に有用です。

ログの内容

logName は、構成で指定されたレシーバ ID から取得されます。LogEntry 内の詳細なフィールドは、次のとおりです。

flink ログの LogEntry には次のフィールドが含まれます。

フィールド タイプ 説明
jsonPayload.level 文字列 ログエントリ レベル
jsonPayload.message 文字列 ログ メッセージ(指定した場合の詳細なスタックトレースを含む)
jsonPayload.source 文字列 ログエントリのソース Java クラス
severity 文字列(LogSeverity ログエントリ レベル(変換済み)。

Flink から指標を取り込むには、Flink が生成する指標のレシーバーを作成してから、新しいレシーバー用のパイプラインを作成する必要があります。

このレシーバーでは、複数のエンドポイントのモニタリングなど、構成で複数のインスタンスを使用することはできません。このようなインスタンスはすべて同じ時系列に書き込まれるため、Cloud Monitoring ではインスタンスを区別できません。

flink 指標のレシーバーを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
collection_interval 60s 期間の値(例: 30s5m)。
endpoint http://localhost:8081 Flink によって公開される URL。
type 値は、flink にする必要があります。

モニタリング対象

次の表に、Ops エージェントが Flink インスタンスから収集する指標の一覧を示します。

指標タイプ
種類、タイプ
モニタリング対象リソース
ラベル
workload.googleapis.com/flink.job.checkpoint.count
CUMULATIVEINT64
gce_instance
checkpoint
host_name
job_name
workload.googleapis.com/flink.job.checkpoint.in_progress
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.size
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.time
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.restart.count
CUMULATIVEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.jvm.class_loader.classes_loaded
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.load
GAUGEDOUBLE
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.time
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.gc.collections.count
CUMULATIVEINT64
gce_instance
garbage_collector_name
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.gc.collections.time
CUMULATIVEINT64
gce_instance
garbage_collector_name
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.direct.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.direct.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.threads.count
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.total
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.operator.record.count
CUMULATIVEINT64
gce_instance
host_name
job_name
operator_name
record
subtask_index
task_name
taskmanager_id
workload.googleapis.com/flink.operator.watermark.output
GAUGEINT64
gce_instance
host_name
job_name
operator_name
subtask_index
task_name
taskmanager_id
workload.googleapis.com/flink.task.record.count
CUMULATIVEINT64
gce_instance
host_name
job_name
record
subtask_index
task_name
taskmanager_id

構成を確認する

このセクションでは、Flink レシーバが正しく構成されていることを確認する方法について説明します。Ops エージェントがテレメトリーの収集を開始するまでに 1~2 分かかる場合があります。

Flink ログが Cloud Logging に送信されていることを確認するには、次のようにします。

  1. Google Cloud コンソールで、[ログ エクスプローラ] ページに移動します。

    [ログ エクスプローラ] に移動

    検索バーを使用してこのページを検索する場合は、小見出しが「Logging」の結果を選択します。

  2. エディタに次のクエリを入力し、[クエリを実行] をクリックします。
    resource.type="gce_instance"
    log_id("flink")
    

Flink 指標が Cloud Monitoring に送信されていることを確認するには、次のようにします。

  1. Google Cloud コンソールで、[Metrics Explorer] ページに移動します。

    Metrics Explorer に移動

    検索バーを使用してこのページを検索する場合は、小見出しが [Monitoring] の結果を選択します。

  2. クエリビルダー ペインのツールバーで、[MQL] または [PROMQL] という名前のボタンを選択します。
  3. [言語] で [MQL] が選択されていることを確認します。言語切り替えボタンは、クエリの書式設定と同じツールバーにあります。
  4. エディタに次のクエリを入力し、[クエリを実行] をクリックします。
    fetch gce_instance
    | metric 'workload.googleapis.com/flink.jvm.memory.heap.used'
    | every 1m
    

ダッシュボードを表示する

Flink 指標を表示するには、グラフまたはダッシュボードが構成されている必要があります。Flink インテグレーションには、1 つ以上のダッシュボードが含まれています。インテグレーションを構成して Ops エージェントが指標データの収集を開始すると、ダッシュボードは自動的にインストールされます。

インテグレーションをインストールすることなく、ダッシュボードの静的プレビューを表示することもできます。

インストールされているダッシュボードを表示する手順は次のとおりです。

  1. Google Cloud コンソールで [ダッシュボード] ページに移動します。

    [ダッシュボード] に移動

    検索バーを使用してこのページを検索する場合は、小見出しが [Monitoring] の結果を選択します。

  2. [ダッシュボード リスト] タブを選択し、[統合] カテゴリを選択します。
  3. 表示するダッシュボードの名前をクリックします。

インテグレーションを構成してもダッシュボードがインストールされていない場合は、Ops エージェントが実行されていることを確認します。ダッシュボードにグラフの指標データがない場合、ダッシュボードのインストールは失敗します。Ops エージェントが指標の収集を開始した後に、ダッシュボードがインストールされます。

ダッシュボードの静的プレビューを表示する手順は次のとおりです。

  1. Google Cloud コンソールで [インテグレーション] ページに移動します。

    [インテグレーション] に移動

    検索バーを使用してこのページを検索する場合は、小見出しが [Monitoring] の結果を選択します。

  2. デプロイメント プラットフォーム フィルタの [Compute Engine] をクリックします。
  3. Flink のエントリを見つけて、[詳細を表示] をクリックします。
  4. [ダッシュボード] タブを選択すると、静的プレビューが表示されます。ダッシュボードがインストールされている場合は、[ダッシュボードを表示] をクリックして移動できます。

Cloud Monitoring のダッシュボードについて詳しくは、ダッシュボードとグラフをご覧ください。

[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。

アラート ポリシーをインストールする

アラート ポリシーは、指定した条件が成立した際に通知するように Cloud Monitoring に指示します。Flink インテグレーションには、使用する 1 つ以上のアラート ポリシーが含まれています。これらのアラート ポリシーは、Monitoring の [インテグレーション] ページで表示してインストールできます。

使用可能なアラート ポリシーの説明を表示してインストールする手順は次のとおりです。

  1. Google Cloud コンソールで [統合] ページに移動します。

    [インテグレーション] に移動

    検索バーを使用してこのページを検索する場合は、小見出しが [Monitoring] の結果を選択します。

  2. Flink のエントリを見つけて、[詳細を表示] をクリックします。
  3. [アラート] タブを選択します。このタブには、利用可能なアラート ポリシーの説明と、それらをインストールするためのインターフェースが表示されます。
  4. アラート ポリシーをインストールします。アラート ポリシーでは、アラートがトリガーされた通知の送信先を特定する必要があるため、インストール環境の情報が必要になります。アラート ポリシーをインストールする手順は次のとおりです。
    1. 利用可能なアラート ポリシーのリストから、インストールするアラート ポリシーを選択します。
    2. [通知の構成] セクションで、1 つ以上の通知チャンネルを選択します。通知チャンネルの使用を無効にすることもできますが、無効にすると、アラート ポリシーは通知なく起動します。Monitoring でステータスを確認できますが、通知は受信しません。

      通知チャンネルの詳細については、通知チャンネルを管理するをご覧ください。

    3. [ポリシーの作成] をクリックします。

Cloud Monitoring のアラート ポリシーの詳細については、アラートの概要をご覧ください。

[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。

次のステップ

Ansible を使用して Ops エージェントをインストールし、サードパーティ アプリケーションを構成してサンプル ダッシュボードをインストールする方法についてのチュートリアルは、Ops エージェントをインストールして、サードパーティ アプリケーションのトラブルシューティングを行うの動画をご覧ください。