收集 Apache Cassandra 記錄
本文說明如何使用 Bindplane,將 Apache Cassandra 記錄檔擷取至 Google Security Operations。剖析器會擷取欄位,並轉換為 Unified Data Model (UDM)。這個外掛程式會使用 grok 模式剖析初始訊息,然後使用 JSON 篩選器處理巢狀資料,並執行條件式轉換,將各種欄位對應至 UDM 對等項目、處理不同記錄層級,以及使用中繼資料擴充輸出內容。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- Windows 2016 以上版本或 Linux 主機 (含 systemd)
- 如果透過 Proxy 執行,防火牆通訊埠已開啟
- Apache Cassandra 執行個體的特殊存取權
取得 Google SecOps 擷取驗證檔案
- 登入 Google SecOps 控制台。
- 依序前往「SIEM 設定」>「收集代理程式」。
- 下載擷取驗證檔案。將檔案安全地儲存在要安裝 Bindplane 的系統上。
取得 Google SecOps 客戶 ID
- 登入 Google SecOps 控制台。
- 依序前往「SIEM 設定」>「設定檔」。
- 複製並儲存「機構詳細資料」專區中的客戶 ID。
安裝 Bindplane 代理程式
Windows 安裝
- 以系統管理員身分開啟「命令提示字元」或「PowerShell」。
執行下列指令:
msiexec /i `https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi` /quiet
Linux 安裝
- 開啟具有根層級或 sudo 權限的終端機。
執行下列指令:
sudo sh -c `$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)` install_unix.sh
其他安裝資源
如需其他安裝選項,請參閱安裝指南。
設定 Bindplane 代理程式,擷取系統記錄檔並傳送至 Google SecOps
- 存取設定檔:
- 找出
config.yaml
檔案。通常位於 Linux 的/etc/bindplane-agent/
目錄,或 Windows 的安裝目錄。 - 使用文字編輯器 (例如
nano
、vi
或記事本) 開啟檔案。
- 找出
按照下列方式編輯
config.yaml
檔案:receivers: udplog: # Replace the port and IP address as required listen_address: `0.0.0.0:514` exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <customer_id> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization ingestion_labels: log_type: 'CASSANDRA' raw_log_field: body service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels
視基礎架構需求,替換通訊埠和 IP 位址。
將
<customer_id>
替換為實際的客戶 ID。將
/path/to/ingestion-authentication-file.json
更新為「取得 Google SecOps 擷取驗證檔案」一節中驗證檔案的儲存路徑。
重新啟動 Bindplane 代理程式,以套用變更
如要在 Linux 中重新啟動 Bindplane 代理程式,請執行下列指令:
sudo systemctl restart bindplane-agent
如要在 Windows 中重新啟動 Bindplane 代理程式,可以使用「服務」控制台,或輸入下列指令:
net stop BindPlaneAgent && net start BindPlaneAgent
在 Apache Cassandra 中設定 Syslog 匯出功能
- 使用 SSH 登入 Apache Cassandra 主機。
- 開啟設定檔
logback.xml
,並在第 28 行插入下列程式碼:- 在大多數版本的 Apache Cassandra 中,位置會是
$(CASSANDRA_HOME)/conf
。 - 如果是 Datastax Enterprise 的套件安裝作業,位置會是
/etc/dse
。 - 如果是 DSE 的 Tar 檔案安裝,位置會是
$(TARBALL_ROOT)/resources/cassandra/conf
。
- 在大多數版本的 Apache Cassandra 中,位置會是
在第 28 行的
logback.xml
檔案中新增以下 Appender 定義:<appender name="SYSLOG" class="ch.qos.logback.classic.net.SyslogAppender"> <syslogHost>bindplane-ip</syslogHost> <port>bindplane-port</port> <facility>LOCAL7</facility> <throwableExcluded>true</throwableExcluded> <suffixPattern>%thread:%level:%logger{36}:%msg</suffixPattern> </appender>
請將
bindplane-ip
和bindplane-port
替換為實際的 Bindplane 代理程式 IP 位址和連接埠。在
logback.xml
檔案的根記錄器區塊<root level=
INFO>
中新增下列程式碼:插入這行的位置取決於 Apache Cassandra 版本:
- Apache Cassandra 5.0.x,第 123 行。
- Apache Cassandra 4.0.x 和 4.1.x,第 115 行。
- Apache Cassandra 3.11.x 和 3.0.x,第 92 行。
- Datastax Enterprise (所有版本),第 121 行。
<appender-ref ref=`SYSLOG` />
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
agent.ephemeral_id |
observer.labels.value |
內部 JSON 訊息中的 agent.ephemeral_id 值。 |
agent.hostname |
observer.hostname |
內部 JSON 訊息中的 agent.hostname 值。 |
agent.id |
observer.asset_id |
filebeat: 與內部 JSON 訊息中 agent.id 值的串連。 |
agent.name |
observer.user.userid |
內部 JSON 訊息中的 agent.name 值。 |
agent.type |
observer.application |
內部 JSON 訊息中的 agent.type 值。 |
agent.version |
observer.platform_version |
內部 JSON 訊息中的 agent.version 值。 |
cloud.availability_zone |
principal.cloud.availability_zone |
內部 JSON 訊息中的 cloud.availability_zone 值。 |
cloud.instance.id |
principal.resource.product_object_id |
內部 JSON 訊息中的 cloud.instance.id 值。 |
cloud.instance.name |
principal.resource.name |
內部 JSON 訊息中的 cloud.instance.name 值。 |
cloud.machine.type |
principal.resource.attribute.labels.value |
內部 JSON 訊息的 cloud.machine.type 值,其中對應的 key 為 machine_type 。 |
cloud.provider |
principal.resource.attribute.labels.value |
內部 JSON 訊息的 cloud.provider 值,其中對應的 key 為 provider 。 |
event_metadata._id |
metadata.product_log_id |
內部 JSON 訊息中的 event_metadata._id 值。 |
event_metadata.version |
metadata.product_version |
內部 JSON 訊息中的 event_metadata.version 值。 |
host.architecture |
target.asset.hardware.cpu_platform |
內部 JSON 訊息中的 host.architecture 值。 |
host.fqdn |
target.administrative_domain |
內部 JSON 訊息中的 host.fqdn 值。 |
host.hostname |
target.hostname |
內部 JSON 訊息中的 host.hostname 值。 |
host.id |
target.asset.asset_id |
Host Id: 與內部 JSON 訊息中 host.id 值的串連。 |
host.ip |
target.asset.ip |
內部 JSON 訊息中 host.ip 的 IP 位址陣列。 |
host.mac |
target.mac |
來自內部 JSON 訊息的 host.mac MAC 位址陣列。 |
host.os.kernel |
target.platform_patch_level |
內部 JSON 訊息中的 host.os.kernel 值。 |
host.os.platform |
target.platform |
如果 host.os.platform 為 debian ,請設為 LINUX 。 |
host.os.version |
target.platform_version |
內部 JSON 訊息中的 host.os.version 值。 |
hostname |
principal.hostname |
使用 grok 從 message 欄位擷取的 hostname 值。 |
key |
security_result.detection_fields.value |
使用 grok 從 message 欄位擷取的 key 值,其中對應的 key 為 key 。 |
log.file.path |
principal.process.file.full_path |
內部 JSON 訊息中的 log.file.path 值。 |
log_level |
security_result.severity |
根據 log_level 的值對應:DEBUG 、INFO 、AUDIT 對應至 INFORMATIONAL ;ERROR 對應至 ERROR ;WARNING 對應至 MEDIUM 。 |
log_level |
security_result.severity_details |
使用 grok 從 message 欄位擷取的 log_level 值。 |
log_type |
metadata.log_type |
原始記錄中的 log_type 值。 |
message |
security_result.description |
使用 grok 從 message 欄位擷取的說明。 |
message |
target.process.command_line |
使用 grok 從 message 欄位擷取的指令列。 |
now |
security_result.detection_fields.value |
使用 grok 從 message 欄位擷取的 now 值,其中對應的 key 為 now 。使用 grok 從 message 欄位擷取 event_time 欄位,並從中剖析。如果同時存在 hostname 和 host.hostname ,請設為 USER_RESOURCE_ACCESS ,否則請設為 GENERIC_EVENT 。請設為 CASSANDRA 。請設為 CASSANDRA 。請設為 ephemeral_id 。如果 cloud.instance.name 存在,請設為 VIRTUAL_MACHINE 。將對應的偵測欄位設為 key 和 now 。 |
timestamp |
timestamp |
來自原始記錄的 create_time 欄位。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。