Censys のログを収集する
このドキュメントでは、Amazon S3 を使用して Censys ログを Google Security Operations に取り込む方法について説明します。Censys は、API を通じて包括的な攻撃対象領域管理とインターネット インテリジェンスを提供します。この統合により、Censys ASM からホスト検出イベント、リスクイベント、アセットの変更を収集し、分析とモニタリングのために Google SecOps に転送できます。パーサーは、未加工ログを Google SecOps UDM に準拠した構造化形式に変換します。未加工のログ メッセージからフィールドを抽出し、データ型の変換を行い、抽出された情報を対応する UDM フィールドにマッピングして、追加のコンテキストとラベルでデータを拡充します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- Censys ASM への特権アクセス
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
Censys の前提条件(API 認証情報)を収集する
app.censys.io
で Censys ASM Console にログインします。- ページ上部の [インテグレーション] に移動します。
- API キーと組織 ID をコピーして保存します。
- API ベース URL(
https://api.platform.censys.io
)をメモします。
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
censys-logs
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] で [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM> ポリシー> ポリシーの作成> JSON タブ] に移動します。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::censys-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::censys-logs/censys/state.json" } ] }
- 別のバケット名を入力した場合は、
censys-logs
を置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーと、AWSLambdaBasicExecutionRole マネージド ポリシー(CloudWatch Logs へのアクセス用)を関連付けます。
ロールに「
censys-lambda-role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
- 次の構成情報を提供してください。
設定 | 値 |
---|---|
名前 | censys-data-collector |
ランタイム | Python 3.13 |
アーキテクチャ | x86_64 |
実行ロール | censys-lambda-role |
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
censys-data-collector.py
)を入力します。import json import boto3 import urllib3 import gzip import logging import os from datetime import datetime, timedelta, timezone from typing import Dict, List, Any, Optional from urllib.parse import urlencode # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS S3 client s3_client = boto3.client('s3') # HTTP client http = urllib3.PoolManager() # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] CENSYS_API_KEY = os.environ['CENSYS_API_KEY'] CENSYS_ORG_ID = os.environ['CENSYS_ORG_ID'] API_BASE = os.environ.get('API_BASE', 'https://api.platform.censys.io') class CensysCollector: def __init__(self): self.headers = { 'Authorization': f'Bearer {CENSYS_API_KEY}', 'X-Organization-ID': CENSYS_ORG_ID, 'Content-Type': 'application/json' } def get_last_collection_time(self) -> Optional[datetime]: """Get the last collection timestamp from S3 state file.""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) state = json.loads(response['Body'].read().decode('utf-8')) return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z')) except Exception as e: logger.info(f"No state file found or error reading state: {e}") return datetime.now(timezone.utc) - timedelta(hours=1) def save_collection_time(self, collection_time: datetime): """Save the current collection timestamp to S3 state file.""" state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')} s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state), ContentType='application/json' ) def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]: """Collect logbook events from Censys ASM API using cursor-based pagination.""" events = [] url = f"{API_BASE}/v3/logbook" # Use cursor-based pagination as per Censys API documentation params = {} if cursor: params['cursor'] = cursor try: query_string = urlencode(params) if params else '' full_url = f"{url}?{query_string}" if query_string else url response = http.request('GET', full_url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('logbook_entries', [])) # Handle cursor-based pagination next_cursor = data.get('next_cursor') if next_cursor: events.extend(self.collect_logbook_events(next_cursor)) logger.info(f"Collected {len(events)} logbook events") return events except Exception as e: logger.error(f"Error collecting logbook events: {e}") return [] def collect_risks_events(self) -> List[Dict[str, Any]]: """Collect risk events from Censys ASM API.""" events = [] url = f"{API_BASE}/v3/risks" try: response = http.request('GET', url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('risks', [])) logger.info(f"Collected {len(events)} risk events") return events except Exception as e: logger.error(f"Error collecting risk events: {e}") return [] def save_events_to_s3(self, events: List[Dict[str, Any]], event_type: str): """Save events to S3 in compressed NDJSON format.""" if not events: return timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') filename = f"{S3_PREFIX}{event_type}_{timestamp}.json.gz" try: # Convert events to newline-delimited JSON ndjson_content = 'n'.join(json.dumps(event, separators=(',', ':')) for event in events) # Compress with gzip gz_bytes = gzip.compress(ndjson_content.encode('utf-8')) s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=gz_bytes, ContentType='application/gzip', ContentEncoding='gzip' ) logger.info(f"Saved {len(events)} {event_type} events to {filename}") except Exception as e: logger.error(f"Error saving {event_type} events to S3: {e}") raise def lambda_handler(event, context): """AWS Lambda handler function.""" try: collector = CensysCollector() # Get last collection time for cursor state management last_collection_time = collector.get_last_collection_time() current_time = datetime.now(timezone.utc) logger.info(f"Collecting events since {last_collection_time}") # Collect different types of events logbook_events = collector.collect_logbook_events() risk_events = collector.collect_risks_events() # Save events to S3 collector.save_events_to_s3(logbook_events, 'logbook') collector.save_events_to_s3(risk_events, 'risks') # Update state collector.save_collection_time(current_time) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Censys data collection completed successfully', 'logbook_events': len(logbook_events), 'risk_events': len(risk_events), 'collection_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ') }) } except Exception as e: logger.error(f"Lambda execution failed: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'error': str(e) }) }
[構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。
次の環境変数を入力し、実際の値に置き換えます。
キー 値の例 S3_BUCKET
censys-logs
S3_PREFIX
censys/
STATE_KEY
censys/state.json
CENSYS_API_KEY
<your-censys-api-key>
CENSYS_ORG_ID
<your-organization-id>
API_BASE
https://api.platform.censys.io
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数
censys-data-collector
。 - 名前:
censys-data-collector-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [Users] > [Add users] に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー:
secops-reader
。 - アクセスタイプ: アクセスキー - プログラマティック アクセス。
- ユーザー:
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]。
JSON エディタで、次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::censys-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::censys-logs" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
Censys のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Censys logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [CENSYS] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://censys-logs/censys/
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- Asset namespace: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
assetId | read_only_udm.principal.asset.hostname | assetId フィールドが IP アドレスでない場合、principal.asset.hostname にマッピングされます。 |
assetId | read_only_udm.principal.asset.ip | assetId フィールドが IP アドレスの場合、principal.asset.ip にマッピングされます。 |
assetId | read_only_udm.principal.hostname | assetId フィールドが IP アドレスでない場合は、principal.hostname にマッピングされます。 |
assetId | read_only_udm.principal.ip | assetId フィールドが IP アドレスの場合、principal.ip にマッピングされます。 |
associatedAt | read_only_udm.security_result.detection_fields.value | associatedAt フィールドは security_result.detection_fields.value にマッピングされます。 |
autonomousSystem.asn | read_only_udm.additional.fields.value.string_value | autonomousSystem.asn フィールドは文字列に変換され、キー「autonomousSystem_asn」を持つ additional.fields.value.string_value にマッピングされます。 |
autonomousSystem.bgpPrefix | read_only_udm.additional.fields.value.string_value | autonomousSystem.bgpPrefix フィールドは、キーが「autonomousSystem_bgpPrefix」の additional.fields.value.string_value にマッピングされます。 |
バナー | read_only_udm.principal.resource.attribute.labels.value | バナー フィールドは、キー「banner」を使用して principal.resource.attributes.labels.value にマッピングされます。 |
クラウド | read_only_udm.metadata.vendor_name | cloud フィールドは metadata.vendor_name にマッピングされます。 |
comments.refUrl | read_only_udm.network.http.referral_url | comments.refUrl フィールドは network.http.referral_url にマッピングされます。 |
data.cve | read_only_udm.additional.fields.value.string_value | data.cve フィールドは、キー「data_cve」を使用して additional.fields.value.string_value にマッピングされます。 |
data.cvss | read_only_udm.additional.fields.value.string_value | data.cvss フィールドは、キー「data_cvss」を持つ additional.fields.value.string_value にマッピングされます。 |
data.ipAddress | read_only_udm.principal.asset.ip | data.ipAddress フィールドが assetId フィールドと等しくない場合、principal.asset.ip にマッピングされます。 |
data.ipAddress | read_only_udm.principal.ip | data.ipAddress フィールドが assetId フィールドと等しくない場合、principal.ip にマッピングされます。 |
data.location.city | read_only_udm.principal.location.city | location.city フィールドが空の場合、data.location.city フィールドは principal.location.city にマッピングされます。 |
data.location.countryCode | read_only_udm.principal.location.country_or_region | location.country フィールドが空の場合、data.location.countryCode フィールドは principal.location.country_or_region にマッピングされます。 |
data.location.latitude | read_only_udm.principal.location.region_coordinates.latitude | location.coordinates.latitude フィールドと location.geoCoordinates.latitude フィールドが空の場合、data.location.latitude フィールドは float に変換され、principal.location.region_coordinates.latitude にマッピングされます。 |
data.location.longitude | read_only_udm.principal.location.region_coordinates.longitude | location.coordinates.longitude フィールドと location.geoCoordinates.longitude フィールドが空の場合、data.location.longitude フィールドは float に変換され、principal.location.region_coordinates.longitude にマッピングされます。 |
data.location.province | read_only_udm.principal.location.state | location.province フィールドが空の場合、data.location.province フィールドは principal.location.state にマッピングされます。 |
data.mailServers | read_only_udm.additional.fields.value.list_value.values.string_value | data.mailServers 配列の各要素は、キー「Mail Servers」と値を持つ個別の additional.fields エントリにマッピングされます。list_value.values.string_value は要素の値に設定されます。 |
data.names.forwardDns[].name | read_only_udm.network.dns.questions.name | data.names.forwardDns 配列の各要素は、別個の network.dns.questions エントリにマッピングされ、その name フィールドは要素の name フィールドに設定されます。 |
data.nameServers | read_only_udm.additional.fields.value.list_value.values.string_value | data.nameServers 配列の各要素は、キーが「Name nameServers」、value.list_value.values.string_value が要素の値に設定された個別の additional.fields エントリにマッピングされます。 |
data.protocols[].transportProtocol | read_only_udm.network.ip_protocol | data.protocols[].transportProtocol フィールドが TCP、EIGRP、ESP、ETHERIP、GRE、ICMP、IGMP、IP6IN4、PIM、UDP、VRRP のいずれかの場合、network.ip_protocol にマッピングされます。 |
data.protocols[].transportProtocol | read_only_udm.principal.resource.attribute.labels.value | data.protocols[].transportProtocol フィールドは、キー「data_protocols {index}」を使用して principal.resource.attribute.labels.value にマッピングされます。 |
http.request.headers[].key、http.request.headers[].value.headers.0 | read_only_udm.network.http.user_agent | http.request.headers[].key フィールドが「User-Agent」の場合、対応する http.request.headers[].value.headers.0 フィールドは network.http.user_agent にマッピングされます。 |
http.request.headers[].key、http.request.headers[].value.headers.0 | read_only_udm.network.http.parsed_user_agent | http.request.headers[].key フィールドが「User-Agent」の場合、対応する http.request.headers[].value.headers.0 フィールドはユーザー エージェント文字列として解析され、network.http.parsed_user_agent にマッピングされます。 |
http.request.headers[].key、http.request.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key、read_only_udm.principal.resource.attribute.labels.value | http.request.headers 配列の各要素について、key フィールドは principal.resource.attribute.labels.key にマッピングされ、value.headers.0 フィールドは principal.resource.attribute.labels.value にマッピングされます。 |
http.request.uri | read_only_udm.principal.asset.hostname | http.request.uri フィールドのホスト名部分が抽出され、principal.asset.hostname にマッピングされます。 |
http.request.uri | read_only_udm.principal.hostname | http.request.uri フィールドのホスト名部分が抽出され、principal.hostname にマッピングされます。 |
http.response.body | read_only_udm.principal.resource.attribute.labels.value | http.response.body フィールドは、キー「http_response_body」を持つ principal.resource.attributes.labels.value にマッピングされます。 |
http.response.headers[].key、http.response.headers[].value.headers.0 | read_only_udm.target.hostname | http.response.headers[].key フィールドが「Server」の場合、対応する http.response.headers[].value.headers.0 フィールドは target.hostname にマッピングされます。 |
http.response.headers[].key、http.response.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key、read_only_udm.principal.resource.attribute.labels.value | http.response.headers 配列の各要素について、key フィールドは principal.resource.attribute.labels.key にマッピングされ、value.headers.0 フィールドは principal.resource.attribute.labels.value にマッピングされます。 |
http.response.statusCode | read_only_udm.network.http.response_code | http.response.statusCode フィールドは整数に変換され、network.http.response_code にマッピングされます。 |
ip | read_only_udm.target.asset.ip | ip フィールドは target.asset.ip にマッピングされます。 |
ip | read_only_udm.target.ip | ip フィールドは target.ip にマッピングされます。 |
isSeed | read_only_udm.additional.fields.value.string_value | isSeed フィールドは文字列に変換され、キー「isSeed」を持つ additional.fields.value.string_value にマッピングされます。 |
location.city | read_only_udm.principal.location.city | location.city フィールドは principal.location.city にマッピングされます。 |
location.continent | read_only_udm.additional.fields.value.string_value | location.continent フィールドは、キーが「location_continent」の additional.fields.value.string_value にマッピングされます。 |
location.coordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | location.coordinates.latitude フィールドは float に変換され、principal.location.region_coordinates.latitude にマッピングされます。 |
location.coordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | location.coordinates.longitude フィールドは float に変換され、principal.location.region_coordinates.longitude にマッピングされます。 |
location.country | read_only_udm.principal.location.country_or_region | location.country フィールドは principal.location.country_or_region にマッピングされます。 |
location.geoCoordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | location.coordinates.latitude フィールドが空の場合、location.geoCoordinates.latitude フィールドは float に変換され、principal.location.region_coordinates.latitude にマッピングされます。 |
location.geoCoordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | location.coordinates.longitude フィールドが空の場合、location.geoCoordinates.longitude フィールドは float に変換され、principal.location.region_coordinates.longitude にマッピングされます。 |
location.postalCode | read_only_udm.additional.fields.value.string_value | location.postalCode フィールドは、キーが「Postal code」の additional.fields.value.string_value にマッピングされます。 |
location.province | read_only_udm.principal.location.state | location.province フィールドは principal.location.state にマッピングされます。 |
オペレーション | read_only_udm.security_result.action_details | operation フィールドは security_result.action_details にマッピングされます。 |
perspectiveId | read_only_udm.principal.group.product_object_id | perspectiveId フィールドは principal.group.product_object_id にマッピングされます。 |
ポート | read_only_udm.principal.port | ポート フィールドは整数に変換され、principal.port にマッピングされます。 |
risks[].severity、risks[].title | read_only_udm.security_result.category_details | risks[].severity フィールドが risks[].title フィールドと連結され、security_result.category_details にマッピングされます。 |
serviceName | read_only_udm.network.application_protocol | serviceName フィールドが「HTTP」または「HTTPS」の場合、network.application_protocol にマッピングされます。 |
sourceIp | read_only_udm.principal.asset.ip | sourceIp フィールドは principal.asset.ip にマッピングされます。 |
sourceIp | read_only_udm.principal.ip | sourceIp フィールドは principal.ip にマッピングされます。 |
timestamp | read_only_udm.metadata.event_timestamp | タイムスタンプ フィールドはタイムスタンプとして解析され、metadata.event_timestamp にマッピングされます。 |
transportFingerprint.id | read_only_udm.metadata.product_log_id | transportFingerprint.id フィールドは文字列に変換され、metadata.product_log_id にマッピングされます。 |
transportFingerprint.raw | read_only_udm.additional.fields.value.string_value | transportFingerprint.raw フィールドは、キー「transportFingerprint_raw」を使用して additional.fields.value.string_value にマッピングされます。 |
type | read_only_udm.metadata.product_event_type | type フィールドは metadata.product_event_type にマッピングされます。 |
- | read_only_udm.metadata.product_name | metadata.product_name には「CENSYS_ASM」という値が割り当てられます。 |
- | read_only_udm.metadata.vendor_name | 値「CENSYS」が metadata.vendor_name に割り当てられます。 |
- | read_only_udm.metadata.event_type | イベントタイプは、特定のフィールドの有無に基づいて決定されます。has_princ_machine_id と has_target_machine が true で、has_network_flow が false の場合は NETWORK_CONNECTION、has_network_flow が true の場合は NETWORK_DNS、has_princ_machine_id が true の場合は STATUS_UPDATE、それ以外の場合は GENERIC_EVENT です。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。