Modello Pub/Sub a Elasticsearch

Il modello Pub/Sub a Elasticsearch è una pipeline di streaming che legge i messaggi da una sottoscrizione Pub/Sub, esegue una funzione definita dall'utente (UDF) e li scrive in Elasticsearch come documenti. Il modello Dataflow utilizza la funzionalità flussi di dati di Elasticsearch per archiviare i dati delle serie temporali in più indici, fornendoti al contempo una singola risorsa denominata per le richieste. I flussi di dati sono adatti a log, metriche, tracce e altri dati generati continuamente archiviati in Pub/Sub.

Il modello crea uno stream di dati denominato logs-gcp.DATASET-NAMESPACE, dove:

  • DATASET è il valore del parametro del modello dataset o pubsub se non specificato.
  • NAMESPACE è il valore del parametro del modello namespace o default se non specificato.

Requisiti della pipeline

  • La sottoscrizione Pub/Sub di origine deve esistere e i messaggi devono essere codificati in un formato JSON valido.
  • Un host Elasticsearch raggiungibile pubblicamente su un'istanza Google Cloud o su Elastic Cloud con Elasticsearch versione 7.0 o successive. Per ulteriori dettagli, consulta Integrazione di Google Cloud per Elastic.
  • Un argomento Pub/Sub per l'output degli errori.

Parametri del modello

Parametri obbligatori

  • inputSubscription: sottoscrizione Pub/Sub da cui utilizzare l'input. Ad esempio, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • errorOutputTopic: l'argomento di output Pub/Sub per la pubblicazione dei record non riusciti, nel formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • connectionUrl: l'URL di Elasticsearch nel formato https://hostname:[port]. Se utilizzi Elastic Cloud, specifica CloudID. Ad esempio: https://elasticsearch-host:9200.
  • apiKey: la chiave API codificata in Base64 da utilizzare per l'autenticazione.

Parametri facoltativi

  • Dataset: il tipo di log inviati utilizzando Pub/Sub, per i quali disponiamo di una dashboard predefinita. I valori dei tipi di log noti sono audit, vpcflow e firewall. Valore predefinito: pubsub.
  • Spazio dei nomi: un raggruppamento arbitrario, ad esempio un ambiente (sviluppo, produzione o controllo qualità), un team o un'unità aziendale strategica. Valore predefinito: default.
  • elasticsearchTemplateVersion: identificatore della versione del modello Dataflow, di solito definito da Google Cloud. Il valore predefinito è 1.0.0.
  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, vedi Esempi di UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: specifica la frequenza con cui ricaricare la funzione definita dall'utente, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e ricarica la UDF se il file viene modificato. Questo parametro ti consente di aggiornare la UDF mentre la pipeline è in esecuzione, senza dover riavviare il job. Se il valore è 0, il ricaricamento delle UDF è disattivato. Il valore predefinito è 0.
  • elasticsearchUsername: il nome utente Elasticsearch per l'autenticazione. Se specificato, il valore di apiKey viene ignorato.
  • elasticsearchPassword: la password di Elasticsearch per l'autenticazione. Se specificato, il valore di apiKey viene ignorato.
  • batchSize: le dimensioni del batch in numero di documenti. Il valore predefinito è 1000.
  • batchSizeBytes: le dimensioni del batch in numero di byte. Il valore predefinito è 5242880 (5 MB).
  • maxRetryAttempts: il numero massimo di tentativi. Deve essere maggiore di zero. Il valore predefinito è no retries.
  • maxRetryDuration: la durata massima dei tentativi in millisecondi. Deve essere maggiore di zero. Il valore predefinito è no retries.
  • propertyAsIndex: la proprietà nel documento in fase di indicizzazione il cui valore specifica i metadati _index da includere con il documento nelle richieste collettive. Ha la precedenza su una UDF _index. Il valore predefinito è none.
  • javaScriptIndexFnGcsPath: il percorso Cloud Storage all'origine della funzione definita dall'utente JavaScript per una funzione che specifica i metadati _index da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptIndexFnName: il nome della funzione JavaScript definita dall'utente che specifica i metadati _index da includere nel documento nelle richieste collettive. Il valore predefinito è none.
  • propertyAsId: una proprietà nel documento in fase di indicizzazione il cui valore specifica i metadati _id da includere nel documento nelle richieste collettive. Ha la precedenza su una UDF _id. Il valore predefinito è none.
  • javaScriptIdFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente per la funzione che specifica i metadati _id da includere nel documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptIdFnName: il nome della funzione JavaScript definita dall'utente che specifica i metadati _id da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptTypeFnGcsPath: il percorso Cloud Storage all'origine della funzione definita dall'utente JavaScript per una funzione che specifica i metadati _type da includere con i documenti nelle richieste collettive. Il valore predefinito è none.
  • javaScriptTypeFnName: il nome della funzione JavaScript definita dall'utente che specifica i metadati _type da includere nel documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptIsDeleteFnGcsPath: il percorso Cloud Storage all'origine della funzione definita dall'utente JavaScript per la funzione che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore stringa di true o false. Il valore predefinito è none.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript definita dall'utente che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore stringa di true o false. Il valore predefinito è none.
  • usePartialUpdate: indica se utilizzare aggiornamenti parziali (aggiornamento anziché creazione o indicizzazione, consentendo documenti parziali) con le richieste Elasticsearch. Il valore predefinito è false.
  • bulkInsertMethod: indica se utilizzare INDEX (indice, consente upsert) o CREATE (crea, errori in caso di _id duplicato) con le richieste collettive Elasticsearch. Il valore predefinito è CREATE.
  • trustSelfSignedCerts: indica se considerare attendibile o meno il certificato autofirmato. Un'istanza Elasticsearch installata potrebbe avere un certificato autofirmato. Imposta questo valore su true per ignorare la convalida del certificato SSL. (Il valore predefinito è false).
  • disableCertificateValidation: se true, considera attendibile il certificato SSL autofirmato. Un'istanza Elasticsearch potrebbe avere un certificato autofirmato. Per ignorare la convalida del certificato, imposta questo parametro su true. Il valore predefinito è false.
  • apiKeyKMSEncryptionKey: la chiave Cloud KMS per decriptare la chiave API. Questo parametro è obbligatorio se apiKeySource è impostato su KMS. Se viene fornito questo parametro, trasmetti una stringa apiKey criptata. Cripta i parametri utilizzando l'endpoint di crittografia dell'API KMS. Per la chiave, utilizza il formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulta: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt. Ad esempio, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: l'ID secret di Secret Manager per l'apiKey. Se apiKeySource è impostato su SECRET_MANAGER, fornisci questo parametro. Utilizza il formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: l'origine della chiave API. I valori consentiti sono PLAINTEXT, KMS e SECRET_MANAGER. Questo parametro è obbligatorio quando utilizzi Secret Manager o KMS. Se apiKeySource è impostato su KMS, devono essere forniti apiKeyKMSEncryptionKey e l'apiKey criptata. Se apiKeySource è impostato su SECRET_MANAGER, deve essere fornito apiKeySecretId. Se apiKeySource è impostato su PLAINTEXT, deve essere fornito apiKey. Il valore predefinito è PLAINTEXT.
  • socketTimeout: se impostato, sovrascrive il timeout massimo predefinito per i tentativi e il timeout predefinito del socket (30.000 ms) in Elastic RestClient.

Funzioni definite dall'utente

Questo modello supporta le funzioni definite dall'utente (UDF) in diversi punti della pipeline, descritti di seguito. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Funzione di trasformazione del testo

Trasforma il messaggio Pub/Sub in un documento Elasticsearch.

Parametri del modello:

  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file JavaScript.
  • javascriptTextTransformFunctionName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il campo dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: un documento JSON convertito in stringa da inserire in Elasticsearch.

Funzione Index

Restituisce l'indice a cui appartiene il documento.

Parametri del modello:

  • javaScriptIndexFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIndexFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _index del documento.

Funzione ID documento

Restituisce l'ID documento.

Parametri del modello:

  • javaScriptIdFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIdFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _id del documento.

Funzione di eliminazione dei documenti

Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento collettivo su INDEX e fornisci una funzione ID documento.

Parametri del modello:

  • javaScriptIsDeleteFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: restituisci la stringa "true" per eliminare il documento o "false" per inserirlo.

Funzione Tipo di mappatura

Restituisce il tipo di mappatura del documento.

Parametri del modello:

  • javaScriptTypeFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptTypeFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _type del documento.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the Pub/Sub to Elasticsearch template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • ERROR_OUTPUT_TOPIC: l'argomento Pub/Sub per l'output degli errori
  • SUBSCRIPTION_NAME: il nome della tua sottoscrizione Pub/Sub
  • CONNECTION_URL: l'URL di Elasticsearch
  • DATASET: il tipo di log
  • NAMESPACE: il tuo spazio dei nomi per il set di dati
  • APIKEY: la chiave API codificata in base64 per l'autenticazione

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • ERROR_OUTPUT_TOPIC: l'argomento Pub/Sub per l'output degli errori
  • SUBSCRIPTION_NAME: il nome della tua sottoscrizione Pub/Sub
  • CONNECTION_URL: l'URL di Elasticsearch
  • DATASET: il tipo di log
  • NAMESPACE: il tuo spazio dei nomi per il set di dati
  • APIKEY: la chiave API codificata in base64 per l'autenticazione

Passaggi successivi