「Pub/Sub 到 Elasticsearch」範本是一個串流管道,可從 Pub/Sub 訂閱項目讀取訊息、執行使用者定義函式 (UDF),並以文件形式寫入 Elasticsearch。Dataflow 範本會使用 Elasticsearch 的資料串流功能,在多個索引中儲存時間序列資料,同時為要求提供單一具名資源。資料串流非常適合用於記錄、指標、追蹤及其他儲存在 Pub/Sub 中的持續產生資料。
範本會建立名為 logs-gcp.DATASET-NAMESPACE
的資料串流,其中:
- DATASET 是
dataset
範本參數的值,如未指定,則為pubsub
。 - NAMESPACE 是
namespace
範本參數的值,如未指定,則為default
。
管道相關規定
- 來源 Pub/Sub 訂閱項目必須存在,且訊息必須以有效的 JSON 格式編碼。
- 執行個體或 Elastic Cloud 上可公開連線的 Elasticsearch 主機,且 Elasticsearch 版本為 7.0 以上。 Google Cloud 詳情請參閱「Google Cloud Integration for Elastic」。
- 用於錯誤輸出的 Pub/Sub 主題。
範本參數
必要參數
- inputSubscription:要取用輸入內容的 Pub/Sub 訂閱項目。例如:
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
。 - errorOutputTopic:用於發布失敗記錄的 Pub/Sub 輸出主題,格式為
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
。 - connectionUrl:Elasticsearch 網址,格式為
https://hostname:[port]
。如果使用 Elastic Cloud,請指定 CloudID。例如:https://elasticsearch-host:9200
。 - apiKey:用於驗證的 Base64 編碼 API 金鑰。
選用參數
- 資料集:透過 Pub/Sub 傳送的記錄類型,我們提供現成的資訊主頁。已知的記錄類型值為
audit
、vpcflow
和firewall
。預設值為pubsub
。 - 命名空間:任意分組,例如環境 (開發、實際工作或 QA)、團隊或策略性業務單位。預設值為
default
。 - elasticsearchTemplateVersion:Dataflow 範本版本 ID,通常由 Google Cloud 定義。預設值為 1.0.0。
- javascriptTextTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI。例如:
gs://my-bucket/my-udfs/my_file.js
。 - javascriptTextTransformFunctionName:要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是
myTransform(inJson) { /*...do stuff...*/ }
,則函式名稱就是myTransform
。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。 - javascriptTextTransformReloadIntervalMinutes:指定重新載入 UDF 的頻率 (以分鐘為單位)。如果值大於 0,Dataflow 會定期檢查 Cloud Storage 中的 UDF 檔案,並在檔案經過修改時重新載入 UDF。您可以在管道執行期間更新 UDF,不必重新啟動工作。如果值為
0
,系統會停用 UDF 重新載入功能。預設值為0
。 - elasticsearchUsername:用於驗證的 Elasticsearch 使用者名稱。如果指定,系統會忽略
apiKey
的值。 - elasticsearchPassword:用於驗證的 Elasticsearch 密碼。如果指定,系統會忽略
apiKey
的值。 - batchSize:批次大小,以文件數量為單位。預設值為
1000
。 - batchSizeBytes:批次大小 (以位元組為單位)。預設值為
5242880
(5 MB)。 - maxRetryAttempts:重試次數上限。必須大於零。預設值為
no retries
。 - maxRetryDuration:重試時間上限 (以毫秒為單位)。必須大於零。預設值為
no retries
。 - propertyAsIndex:要建立索引的文件中的屬性,其值會指定要與文件一起納入大量要求的
_index
中繼資料。優先順序高於_index
UDF。預設值為none
。 - javaScriptIndexFnGcsPath:JavaScript UDF 來源的 Cloud Storage 路徑,適用於指定
_index
中繼資料的函式,可與大量要求中的文件一併納入。預設值為none
。 - javaScriptIndexFnName:UDF JavaScript 函式的名稱,用於指定要隨文件一併納入大量要求的
_index
中繼資料。預設值為none
。 - propertyAsId:要建立索引的文件中的屬性,其值會指定要納入大量要求中的
_id
中繼資料。優先順序高於_id
UDF。預設值為none
。 - javaScriptIdFnGcsPath:JavaScript UDF 來源的 Cloud Storage 路徑,適用於指定
_id
中繼資料的函式,可與大量要求中的文件一併納入。預設值為none
。 - javaScriptIdFnName:UDF JavaScript 函式的名稱,用於指定要隨文件一併納入大量要求的
_id
中繼資料。預設值為none
。 - javaScriptTypeFnGcsPath:JavaScript UDF 來源的 Cloud Storage 路徑,適用於指定
_type
中繼資料的函式,可與大量要求中的文件一併納入。預設值為none
。 - javaScriptTypeFnName:UDF JavaScript 函式的名稱,用於指定要隨文件一併納入大量要求的
_type
中繼資料。預設值為none
。 - javaScriptIsDeleteFnGcsPath:JavaScript UDF 來源的 Cloud Storage 路徑,用於判斷是否要刪除文件,而不是插入或更新文件。函式會傳回
true
或false
的字串值。預設值為none
。 - javaScriptIsDeleteFnName:UDF JavaScript 函式的名稱,用於判斷是否要刪除文件,而不是插入或更新文件。函式會傳回
true
或false
的字串值。預設值為none
。 - usePartialUpdate:是否要對 Elasticsearch 要求使用部分更新 (更新而非建立或建立索引,允許部分文件)。預設值為
false
。 - bulkInsertMethod:是否要搭配使用
INDEX
(索引,允許 upsert) 或CREATE
(建立,重複的 _id 會發生錯誤) 與 Elasticsearch 大量要求。預設值為CREATE
。 - trustSelfSignedCerts:是否信任自行簽署的憑證。安裝的 Elasticsearch 執行個體可能具有自行簽署的憑證,請將此值設為 true,略過 SSL 憑證的驗證。(預設值為
false
)。 - disableCertificateValidation:如果為
true
,請信任自行簽署的 SSL 憑證。Elasticsearch 執行個體可能具有自行簽署的憑證。如要略過憑證驗證,請將這個參數設為true
。預設值為false
。 - apiKeyKMSEncryptionKey:用於解密 API 金鑰的 Cloud KMS 金鑰。如果
apiKeySource
設為KMS
,則必須使用這項參數。如果提供這項參數,請傳遞加密的apiKey
字串。使用 KMS API 加密端點加密參數。鍵的格式為projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
。請參閱:https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt。例如projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
。 - apiKeySecretId:apiKey 的 Secret Manager 密鑰 ID。如果
apiKeySource
設為SECRET_MANAGER
,請提供這項參數。請使用projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version 格式。 - apiKeySource:API 金鑰的來源。允許的值為
PLAINTEXT
、KMS
或SECRET_MANAGER
。使用 Secret Manager 或 KMS 時,必須提供這個參數。如果apiKeySource
設為KMS
,則必須提供apiKeyKMSEncryptionKey
和加密的 apiKey。如果apiKeySource
設為SECRET_MANAGER
,則必須提供apiKeySecretId
。如果apiKeySource
設為PLAINTEXT
,則必須提供apiKey
。預設值為 PLAINTEXT。 - socketTimeout:如果設定此參數,系統會覆寫 Elastic RestClient 中的預設重試逾時時間上限和預設 Socket 逾時時間 (30000 毫秒)。
使用者定義函式
此範本在管道的幾個點支援使用者定義函式 (UDF),詳情如下。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。
文字轉換函式
將 Pub/Sub 訊息轉換為 Elasticsearch 文件。
範本參數:
javascriptTextTransformGcsPath
:JavaScript 檔案的 Cloud Storage URI。javascriptTextTransformFunctionName
:JavaScript 函式的名稱。
函式規格:
- 輸入:Pub/Sub 訊息資料欄位,序列化為 JSON 字串。
- 輸出:要插入 Elasticsearch 的字串化 JSON 文件。
索引函式
傳回文件所屬的索引。
範本參數:
javaScriptIndexFnGcsPath
:JavaScript 檔案的 Cloud Storage URI。javaScriptIndexFnName
:JavaScript 函式的名稱。
函式規格:
- 輸入:Elasticsearch 文件,以 JSON 字串形式序列化。
- 輸出:文件
_index
中繼資料欄位的值。
文件 ID 函式
傳回文件 ID。
範本參數:
javaScriptIdFnGcsPath
:JavaScript 檔案的 Cloud Storage URI。javaScriptIdFnName
:JavaScript 函式的名稱。
函式規格:
- 輸入:以 JSON 字串序列化的 Elasticsearch 文件。
- 輸出:文件
_id
中繼資料欄位的值。
文件刪除功能
指定是否要刪除文件。如要使用這項函式,請將大量插入模式設為 INDEX
,並提供文件 ID 函式。
範本參數:
javaScriptIsDeleteFnGcsPath
:JavaScript 檔案的 Cloud Storage URI。javaScriptIsDeleteFnName
:JavaScript 函式的名稱。
函式規格:
- 輸入:以 JSON 字串序列化的 Elasticsearch 文件。
- 輸出:傳回字串
"true"
刪除文件,或"false"
插入/更新文件。
對應類型函式
傳回文件的對應類型。
範本參數:
javaScriptTypeFnGcsPath
:JavaScript 檔案的 Cloud Storage URI。javaScriptTypeFnName
:JavaScript 函式的名稱。
函式規格:
- 輸入:以 JSON 字串序列化的 Elasticsearch 文件。
- 輸出:文件
_type
中繼資料欄位的值。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Pub/Sub to Elasticsearch template。
- 在提供的參數欄位中輸入參數值。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱REGION_NAME
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
:用於錯誤輸出的 Pub/Sub 主題SUBSCRIPTION_NAME
:您的 Pub/Sub 訂閱項目名稱CONNECTION_URL
:Elasticsearch 網址DATASET
:記錄類型NAMESPACE
:資料集的命名空間APIKEY
:用於驗證的 base64 編碼 API 金鑰
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex", } }
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱LOCATION
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
:用於錯誤輸出的 Pub/Sub 主題SUBSCRIPTION_NAME
:您的 Pub/Sub 訂閱項目名稱CONNECTION_URL
:Elasticsearch 網址DATASET
:記錄類型NAMESPACE
:資料集的命名空間APIKEY
:用於驗證的 base64 編碼 API 金鑰
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。