Cloud Storage 到 Elasticsearch 範本

「Cloud Storage 到 Elasticsearch」範本是一種批次管道,可從 Cloud Storage 值區中儲存的 CSV 檔案讀取資料,並將資料以 JSON 文件形式寫入 Elasticsearch。

管道相關規定

  • Cloud Storage 值區必須存在。
  • 您必須在 Google Cloud 執行個體或 Elasticsearch Cloud 上建立可從 Dataflow 存取的 Elasticsearch 主機。
  • 錯誤輸出內容的 BigQuery 資料表必須存在。

CSV 結構定義

如果 CSV 檔案包含標頭,請將 containsHeaders 範本參數設為 true

否則,請建立描述資料的 JSON 結構定義檔。在 jsonSchemaPath 範本參數中,指定結構定義檔案的 Cloud Storage URI。以下範例顯示 JSON 結構定義:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

或者,您也可以提供使用者定義函式 (UDF),剖析 CSV 文字並輸出 Elasticsearch 文件。

範本參數

必要參數

  • deadletterTable:用於傳送插入失敗資料的 BigQuery 無效信件資料表。例如:your-project:your-dataset.your-table-name
  • inputFileSpec:用於搜尋 CSV 檔案的 Cloud Storage 檔案模式。例如:gs://mybucket/test-*.csv
  • connectionUrl:Elasticsearch 網址,格式為 https://hostname:[port]。如果使用 Elastic Cloud,請指定 CloudID。例如:https://elasticsearch-host:9200
  • apiKey:用於驗證的 Base64 編碼 API 金鑰。
  • index:要求發送至的 Elasticsearch 索引。例如:my-index

選用參數

  • inputFormat:輸入檔案格式。預設值為 CSV
  • containsHeaders:輸入的 CSV 檔案是否包含標頭記錄 (true/false)。僅在讀取 CSV 檔案時需要。預設值為 false。
  • delimiter:輸入文字檔的資料欄分隔符號。預設值:,。舉例來說,,
  • csvFormat:用於剖析記錄的 CSV 格式規格。預設值為 Default。詳情請參閱 https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html。必須與以下網址中完全相同的格式名稱相符:https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html
  • jsonSchemaPath:JSON 結構定義的路徑。預設值為 null。例如:gs://path/to/schema
  • largeNumFiles:如果檔案數量達到數萬個,請設為 true。預設值為 false
  • csvFileEncoding:CSV 檔案的字元編碼格式。允許的值為 US-ASCIIISO-8859-1UTF-8UTF-16。預設值為 UTF-8。
  • logDetailedCsvConversionErrors:設為 true,即可在 CSV 剖析失敗時啟用詳細錯誤記錄。請注意,這可能會在記錄中公開機密資料 (例如,CSV 檔案包含密碼)。預設值:false
  • elasticsearchUsername:用於驗證的 Elasticsearch 使用者名稱。如果指定,系統會忽略 apiKey 的值。
  • elasticsearchPassword:用於驗證的 Elasticsearch 密碼。如果指定,系統會忽略 apiKey 的值。
  • batchSize:批次大小,以文件數量為單位。預設值為 1000
  • batchSizeBytes:批次大小 (以位元組為單位)。預設值為 5242880 (5 MB)。
  • maxRetryAttempts:重試次數上限。必須大於零。預設值為 no retries
  • maxRetryDuration:重試時間上限 (以毫秒為單位)。必須大於零。預設值為 no retries
  • propertyAsIndex:要建立索引的文件中的屬性,其值會指定要與文件一起納入大量要求的 _index 中繼資料。優先順序高於 _index UDF。預設值為 none
  • javaScriptIndexFnGcsPath:JavaScript UDF 來源的 Cloud Storage 路徑,適用於指定 _index 中繼資料的函式,可與大量要求中的文件一併納入。預設值為 none
  • javaScriptIndexFnName:UDF JavaScript 函式的名稱,用於指定要隨文件一併納入大量要求的 _index 中繼資料。預設值為 none
  • propertyAsId:要建立索引的文件中的屬性,其值會指定要納入大量要求中的 _id 中繼資料。優先順序高於 _id UDF。預設值為 none
  • javaScriptIdFnGcsPath:JavaScript UDF 來源的 Cloud Storage 路徑,適用於指定 _id 中繼資料的函式,可與大量要求中的文件一併納入。預設值為 none
  • javaScriptIdFnName:UDF JavaScript 函式的名稱,用於指定要隨文件一併納入大量要求的 _id 中繼資料。預設值為 none
  • javaScriptTypeFnGcsPath:JavaScript UDF 來源的 Cloud Storage 路徑,適用於指定 _type 中繼資料的函式,可與大量要求中的文件一併納入。預設值為 none
  • javaScriptTypeFnName:UDF JavaScript 函式的名稱,用於指定要隨文件一併納入大量要求的 _type 中繼資料。預設值為 none
  • javaScriptIsDeleteFnGcsPath:JavaScript UDF 來源的 Cloud Storage 路徑,用於判斷是否要刪除文件,而不是插入或更新文件。函式會傳回 truefalse 的字串值。預設值為 none
  • javaScriptIsDeleteFnName:UDF JavaScript 函式的名稱,用於判斷是否要刪除文件,而不是插入或更新文件。函式會傳回 truefalse 的字串值。預設值為 none
  • usePartialUpdate:是否要對 Elasticsearch 要求使用部分更新 (更新而非建立或建立索引,允許部分文件)。預設值為 false
  • bulkInsertMethod:是否要搭配使用 INDEX (索引,允許 upsert) 或 CREATE (建立,重複的 _id 會發生錯誤) 與 Elasticsearch 大量要求。預設值為 CREATE
  • trustSelfSignedCerts:是否信任自行簽署的憑證。安裝的 Elasticsearch 執行個體可能具有自行簽署的憑證,請將此值設為 true,略過 SSL 憑證的驗證。(預設值為 false)。
  • disableCertificateValidation:如果為 true,請信任自行簽署的 SSL 憑證。Elasticsearch 執行個體可能具有自行簽署的憑證。如要略過憑證驗證,請將這個參數設為 true。預設值為 false
  • apiKeyKMSEncryptionKey:用於解密 API 金鑰的 Cloud KMS 金鑰。如果 apiKeySource 設為 KMS,則必須使用這項參數。如果提供這項參數,請傳遞加密的 apiKey 字串。使用 KMS API 加密端點加密參數。鍵的格式為 projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>。請參閱:https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt。例如 projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
  • apiKeySecretId:apiKey 的 Secret Manager 密鑰 ID。如果 apiKeySource 設為 SECRET_MANAGER,請提供這項參數。請使用 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version 格式。
  • apiKeySource:API 金鑰的來源。允許的值為 PLAINTEXTKMSSECRET_MANAGER。使用 Secret Manager 或 KMS 時,必須提供這個參數。如果 apiKeySource 設為 KMS,則必須提供 apiKeyKMSEncryptionKey 和加密的 apiKey。如果 apiKeySource 設為 SECRET_MANAGER,則必須提供 apiKeySecretId。如果 apiKeySource 設為 PLAINTEXT,則必須提供 apiKey。預設值為 PLAINTEXT。
  • socketTimeout:如果設定此參數,系統會覆寫 Elastic RestClient 中的預設重試逾時時間上限和預設 Socket 逾時時間 (30000 毫秒)。
  • javascriptTextTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。

使用者定義函式

此範本在管道的幾個點支援使用者定義函式 (UDF),詳情如下。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

文字轉換函式

將 CSV 資料轉換為 Elasticsearch 文件。

範本參數:

  • javascriptTextTransformGcsPath:JavaScript 檔案的 Cloud Storage URI。
  • javascriptTextTransformFunctionName:JavaScript 函式的名稱。

函式規格:

  • 輸入:輸入 CSV 檔案中的單行。
  • 輸出:要插入 Elasticsearch 的字串化 JSON 文件。

索引函式

傳回文件所屬的索引。

範本參數:

  • javaScriptIndexFnGcsPath:JavaScript 檔案的 Cloud Storage URI。
  • javaScriptIndexFnName:JavaScript 函式的名稱。

函式規格:

  • 輸入:Elasticsearch 文件,以 JSON 字串形式序列化。
  • 輸出:文件 _index 中繼資料欄位的值。

文件 ID 函式

傳回文件 ID。

範本參數:

  • javaScriptIdFnGcsPath:JavaScript 檔案的 Cloud Storage URI。
  • javaScriptIdFnName:JavaScript 函式的名稱。

函式規格:

  • 輸入:以 JSON 字串序列化的 Elasticsearch 文件。
  • 輸出:文件 _id 中繼資料欄位的值。

文件刪除功能

指定是否要刪除文件。如要使用這項函式,請將大量插入模式設為 INDEX,並提供文件 ID 函式

範本參數:

  • javaScriptIsDeleteFnGcsPath:JavaScript 檔案的 Cloud Storage URI。
  • javaScriptIsDeleteFnName:JavaScript 函式的名稱。

函式規格:

  • 輸入:以 JSON 字串序列化的 Elasticsearch 文件。
  • 輸出:傳回字串 "true" 刪除文件,或 "false" 插入/更新文件。

對應類型函式

傳回文件的對應類型。

範本參數:

  • javaScriptTypeFnGcsPath:JavaScript 檔案的 Cloud Storage URI。
  • javaScriptTypeFnName:JavaScript 函式的名稱。

函式規格:

  • 輸入:以 JSON 字串序列化的 Elasticsearch 文件。
  • 輸出:文件 _type 中繼資料欄位的值。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Cloud Storage to Elasticsearch template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • INPUT_FILE_SPEC:Cloud Storage 檔案模式。
  • CONNECTION_URL:您的 Elasticsearch 網址。
  • APIKEY:用於驗證的 base64 編碼 API 金鑰。
  • INDEX:您的 Elasticsearch 索引。
  • DEADLETTER_TABLE:您的 BigQuery 資料表。

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • INPUT_FILE_SPEC:Cloud Storage 檔案模式。
  • CONNECTION_URL:您的 Elasticsearch 網址。
  • APIKEY:用於驗證的 base64 編碼 API 金鑰。
  • INDEX:您的 Elasticsearch 索引。
  • DEADLETTER_TABLE:您的 BigQuery 資料表。

後續步驟