Akamai Cloud Monitor のログを収集する
このドキュメントでは、AWS S3 を使用して Akamai Cloud Monitor(ロードバランサ、トラフィック シェイパー、ADC)ログを Google Security Operations に取り込む方法について説明します。Akamai は JSON イベントを HTTPS エンドポイントに push します。API Gateway + Lambda レシーバーは、イベントを S3(JSONL、gz)に書き込みます。パーサーは JSON ログを UDM に変換します。JSON ペイロードからフィールドを抽出し、データ型の変換を行い、UDM スキーマに合わせてフィールドの名前を変更し、カスタム フィールドと URL 構築の特定のロジックを処理します。また、フィールドの有無に基づいてエラー処理と条件付きロジックが組み込まれています。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- Akamai Control Center と Property Manager への特権アクセス
- AWS*(S3、IAM、Lambda、API Gateway) への特権アクセス
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
akamai-cloud-monitor
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] で [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する(Lambda)
- AWS コンソールで、[IAM] > [ポリシー] > [ポリシーの作成] > [JSON] に移動し、以下のポリシーを貼り付けます。
JSON ポリシー(
akamai-cloud-monitor
は S3 バケット名に置き換えます):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutAkamaiObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::akamai-cloud-monitor/*" } ] }
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
JSON ポリシーをアタッチします。
ロールに「
WriteAkamaiCMToS3Role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
設定 | 値 |
---|---|
名前 | akamai_cloud_monitor_to_s3 |
ランタイム | Python 3.13 |
アーキテクチャ | x86_64 |
実行ロール | WriteAkamaiCMToS3Role |
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
akamai_cloud_monitor_to_s3.py
)を入力します。#!/usr/bin/env python3 # Lambda: Receive Akamai Cloud Monitor POST, write JSONL (gz) to S3 import os, json, gzip, io, uuid, base64, datetime as dt import boto3 S3_BUCKET = os.environ["S3_BUCKET_NAME"] S3_PREFIX = os.environ.get("S3_PREFIX", "akamai/cloud-monitor/json/").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret in URL query (?token=...) s3 = boto3.client("s3") def _write_jsonl_gz(objs: list) -> str: key = f"{dt.datetime.utcnow():%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "n").encode()) buf.seek(0) s3.upload_fileobj( buf, S3_BUCKET, f"{S3_PREFIX}{key}", ExtraArgs={ "ContentType": "application/json", "ContentEncoding": "gzip", }, ) return f"s3://{S3_BUCKET}/{S3_PREFIX}{key}" def _parse_records_from_event(event) -> list: # HTTP API (Lambda proxy) event: body is a JSON string body = event.get("body") or "" if event.get("isBase64Encoded"): body = base64.b64decode(body).decode("utf-8", "replace") try: data = json.loads(body) except Exception: # accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] def lambda_handler(event, context=None): # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: qs = event.get("queryStringParameters") or {} token = qs.get("token") if token != INGEST_TOKEN: return {"statusCode": 403, "body": "forbidden"} records = _parse_records_from_event(event) if not records: return {"statusCode": 204, "body": "no content"} key = _write_jsonl_gz(records) return { "statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps({"ok": True, "s3_key": key, "count": len(records)}), }
[構成> 環境変数> 編集] に移動します。
[新しい環境変数を追加] をクリックして、次の値を設定します。
環境変数
キー 例 S3_BUCKET_NAME
akamai-cloud-monitor
S3_PREFIX
akamai/cloud-monitor/json/
INGEST_TOKEN
random-shared-secret
[構成] > [全般設定] に移動します。
[編集] をクリックし、[タイムアウト] を [5 分(300 秒)] に設定します。
[保存] をクリックします。
Amazon API Gateway(Akamai の HTTPS エンドポイント)を作成する
- AWS コンソールで、[API Gateway] > [Create API] に移動します。
- [HTTP API] > [ビルド] を選択します。
- 次の構成の詳細を入力します。
- 統合: [Lambda] を選択し、
akamai_cloud_monitor_to_s3
を選択します。 - ルート: ANY
/{proxy+}
を追加するか、特定のルート(POST/akamai/cloud-monitor
など)を作成します。 - ステージ: $default を作成または使用します。
- 統合: [Lambda] を選択し、
- API をデプロイし、[呼び出し URL](例:
https://abc123.execute-api.<region>.amazonaws.com
)をコピーします。
ログを push するように Akamai Cloud Monitor を構成する
- Akamai Control Center で、Property Manager の [Property] を開きます。
- [ルールを追加> [クラウド管理] を選択] をクリックします。
- Cloud Monitor Instrumentation を追加し、必要なデータセットを選択します。
- Cloud Monitor Data Delivery を追加します。
- 配信ホスト名: API Gateway 呼び出し URL(
abc123.execute-api.<region>.amazonaws.com
など)を入力します。 - 配信 URL パス: ルートと省略可能なクエリ トークン(例:
/akamai/cloud-monitor?token=<INGEST_TOKEN>
)。
- 配信ホスト名: API Gateway 呼び出し URL(
- プロパティ バージョンを保存して有効化します。
Akamai Cloud Monitor(S3 JSON)を取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Akamai Cloud Monitor — S3
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Akamai Cloud Monitor] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://akamai-cloud-monitor/akamai/cloud-monitor/json/
- ソース削除オプション: 転送後にファイルやディレクトリを削除するかどうか。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: 20 文字の英数字のアカウント アクセスキー(例: AKIAIOSFODNN7EXAMPLE)。
- シークレット アクセスキー: 40 文字の英数字のアカウント シークレット アクセスキー(例: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY)。
- アセットの Namespace:
akamai.cloud_monitor
- 取り込みラベル: このフィードのすべてのイベントにラベルが追加されます(
source=akamai_cloud_monitor
、format=json
など)。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
accLang |
network.http.user_agent |
「-」または空の文字列でない場合、直接マッピングされます。 |
city |
principal.location.city |
「-」または空の文字列でない場合、直接マッピングされます。 |
cliIP |
principal.ip |
空の文字列でない場合、直接マッピングされます。 |
country |
principal.location.country_or_region |
「-」または空の文字列でない場合、直接マッピングされます。 |
cp |
additional.fields |
キー「cp」で Key-Value ペアとしてマッピングされます。 |
customField |
about.ip 、about.labels 、src.ip |
Key-Value ペアとして解析されます。「eIp」と「pIp」をそれぞれ src.ip と about.ip にマッピングするための特別な処理。他のキーは about 内のラベルとしてマッピングされます。 |
errorCode |
security_result.summary 、security_result.severity |
存在する場合は、security_result.severity を「ERROR」に設定し、値を security_result.summary にマッピングします。 |
geo.city |
principal.location.city |
city が「-」または空の文字列の場合、直接マッピングされます。 |
geo.country |
principal.location.country_or_region |
country が「-」または空の文字列の場合、直接マッピングされます。 |
geo.lat |
principal.location.region_latitude |
直接マッピングされ、浮動小数点数に変換されます。 |
geo.long |
principal.location.region_longitude |
直接マッピングされ、浮動小数点数に変換されます。 |
geo.region |
principal.location.state |
直接マッピングされます。 |
id |
metadata.product_log_id |
空の文字列でない場合、直接マッピングされます。 |
message.cliIP |
principal.ip |
cliIP が空の文字列の場合、直接マッピングされます。 |
message.fwdHost |
principal.hostname |
直接マッピングされます。 |
message.reqHost |
target.hostname 、target.url |
target.url の作成と target.hostname の抽出に使用されます。 |
message.reqLen |
network.sent_bytes |
直接マッピングされます。totalBytes が空または「-」の場合、符号なし整数に変換されます。 |
message.reqMethod |
network.http.method |
reqMethod が空の文字列の場合、直接マッピングされます。 |
message.reqPath |
target.url |
target.url に追加されました。 |
message.reqPort |
target.port |
直接マッピングされ、reqPort が空の文字列の場合は整数に変換されます。 |
message.respLen |
network.received_bytes |
直接マッピングされ、符号なし整数に変換されます。 |
message.sslVer |
network.tls.version |
直接マッピングされます。 |
message.status |
network.http.response_code |
直接マッピングされます。statusCode が空または「-」の場合、整数に変換されます。 |
message.UA |
network.http.user_agent |
UA が「-」または空の文字列の場合、直接マッピングされます。 |
network.asnum |
additional.fields |
キー「asnum」を持つ Key-Value ペアとしてマッピングされます。 |
network.edgeIP |
intermediary.ip |
直接マッピングされます。 |
network.network |
additional.fields |
キー「network」で Key-Value ペアとしてマッピングされます。 |
network.networkType |
additional.fields |
キー「networkType」で Key-Value ペアとしてマッピングされます。 |
proto |
network.application_protocol |
network.application_protocol の決定に使用されます。 |
queryStr |
target.url |
「-」または空の文字列でない場合、target.url に追加されます。 |
referer |
network.http.referral_url 、about.hostname |
「-」でない場合、直接マッピングされます。抽出されたホスト名は about.hostname にマッピングされます。 |
reqHost |
target.hostname 、target.url |
target.url の作成と target.hostname の抽出に使用されます。 |
reqId |
metadata.product_log_id 、network.session_id |
id が空の文字列の場合、直接マッピングされます。network.session_id にもマッピングされます。 |
reqMethod |
network.http.method |
空の文字列でない場合、直接マッピングされます。 |
reqPath |
target.url |
「-」でない場合は target.url に追加されます。 |
reqPort |
target.port |
直接マッピングされ、整数に変換されます。 |
reqTimeSec |
metadata.event_timestamp 、timestamp |
イベントのタイムスタンプを設定するために使用されます。 |
start |
metadata.event_timestamp 、timestamp |
reqTimeSec が空の文字列の場合に、イベント タイムスタンプを設定するために使用されます。 |
statusCode |
network.http.response_code |
直接マッピングされます。「-」または空の文字列でない場合、整数に変換されます。 |
tlsVersion |
network.tls.version |
直接マッピングされます。 |
totalBytes |
network.sent_bytes |
空でないか「-」でない場合、直接マッピングされ、符号なし整数に変換されます。 |
type |
metadata.product_event_type |
直接マッピングされます。 |
UA |
network.http.user_agent |
「-」または空の文字列でない場合、直接マッピングされます。 |
version |
metadata.product_version |
直接マッピングされます。 |
xForwardedFor |
principal.ip |
「-」または空の文字列でない場合、直接マッピングされます。 |
(パーサー ロジック) | metadata.vendor_name |
「Akamai」に設定します。 |
(パーサー ロジック) | metadata.product_name |
「DataStream」に設定します。 |
(パーサー ロジック) | metadata.event_type |
「NETWORK_HTTP」に設定されます。 |
(パーサー ロジック) | metadata.product_version |
version が空の文字列の場合、「2」に設定します。 |
(パーサー ロジック) | metadata.log_type |
「AKAMAI_CLOUD_MONITOR」に設定します。 |
(パーサー ロジック) | network.application_protocol |
proto または message.proto から決定されます。どちらかに「HTTPS」が含まれている場合は「HTTPS」に設定し、それ以外の場合は「HTTP」に設定します(大文字と小文字を区別しない)。 |
(パーサー ロジック) | security_result.severity |
errorCode が「-」または空の文字列の場合は、「INFORMATIONAL」に設定します。 |
(パーサー ロジック) | target.url |
protocol 、reqHost (または message.reqHost )、reqPath (または message.reqPath )、queryStr から構築されます。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。