Modello Streaming Data Generator a Pub/Sub, BigQuery e Cloud Storage

Il modello Generatore di dati di streaming viene utilizzato per generare un numero illimitato o fisso di record sintetici o messaggi in base allo schema fornito dall'utente alla frequenza specificata. Le destinazioni compatibili includono argomenti Pub/Sub, tabelle BigQuery e bucket Cloud Storage.

Di seguito sono riportati alcuni possibili casi d'uso:

  • Simula la pubblicazione di eventi in tempo reale su larga scala in un argomento Pub/Sub per misurare e determinare il numero e le dimensioni dei consumer necessari per elaborare gli eventi pubblicati.
  • Genera dati sintetici in una tabella BigQuery o in un bucket Cloud Storage per valutare i benchmark delle prestazioni o fungere da prova di fattibilità.

Sink e formati di codifica supportati

La tabella seguente descrive i sink e i formati di codifica supportati da questo modello:
JSON Avro Parquet
Pub/Sub No
BigQuery No No
Cloud Storage

Requisiti della pipeline

  • L'account di servizio worker deve disporre del ruolo assegnato Dataflow Worker (roles/dataflow.worker). Per maggiori informazioni, consulta Introduzione a IAM.
  • Crea un file di schema che contenga un modello JSON per i dati generati. Questo modello utilizza la libreria JSON Data Generator, quindi puoi fornire varie funzioni faker per ogni campo dello schema. Per saperne di più, consulta la documentazione di json-data-generator.

    Ad esempio:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Carica il file dello schema in un bucket Cloud Storage.
  • La destinazione di output deve esistere prima dell'esecuzione. La destinazione deve essere un argomento Pub/Sub, una tabella BigQuery o un bucket Cloud Storage, a seconda del tipo di sink.
  • Se la codifica di output è Avro o Parquet, crea un file dello schema Avro e archivialo in una posizione Cloud Storage.
  • Assegna all'account di servizio worker un ruolo IAM aggiuntivo a seconda della destinazione desiderata.
    Destinazione Ruolo IAM necessario aggiuntivo A quale risorsa applicare
    Pub/Sub Publisher Pub/Sub (roles/pubsub.publisher)
    (per maggiori informazioni, vedi Controllo dell'controllo dell'accesso Pub/Sub con IAM)
    Argomento Pub/Sub
    BigQuery Editor dati BigQuery (roles/bigquery.dataEditor)
    (per ulteriori informazioni, vedi Controllo dell'controllo dell'accesso BigQuery con IAM)
    Set di dati BigQuery
    Cloud Storage Amministratore oggetti Storage (roles/storage.objectAdmin)
    (per ulteriori informazioni, consulta Controllo dell'controllo dell'accesso a Cloud Storage con IAM)
    Bucket Cloud Storage

Parametri del modello

Parametro Descrizione
schemaLocation Posizione del file dello schema. Ad esempio: gs://mybucket/filename.json.
qps Numero di messaggi da pubblicare al secondo. Ad esempio: 100.
sinkType (Facoltativo) Tipo di sink di output. I valori possibili sono PUBSUB, BIGQUERY, GCS. Il valore predefinito è PUBSUB.
outputType (Facoltativo) Tipo di codifica dell'output. I valori possibili sono JSON, AVRO, PARQUET. Il valore predefinito è JSON.
avroSchemaLocation (Facoltativo) Posizione del file dello schema AVRO. Obbligatorio quando outputType è AVRO o PARQUET. Ad esempio: gs://mybucket/filename.avsc.
topic (Facoltativo) Nome dell'argomento Pub/Sub a cui la pipeline deve pubblicare i dati. Obbligatorio quando sinkType è Pub/Sub. Ad esempio: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Facoltativo) Nome della tabella BigQuery di output. Obbligatorio quando sinkType è BigQuery. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Facoltativo) Istruzione di scrittura BigQuery. I valori possibili sono WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
outputDeadletterTable (Facoltativo) Nome della tabella BigQuery di output in cui archiviare i record non riusciti. Se non viene fornita, la pipeline crea la tabella durante l'esecuzione con il nome {output_table_name}_error_records. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Facoltativo) Percorso della posizione di output di Cloud Storage. Obbligatorio quando sinkType è Cloud Storage. Ad esempio: gs://mybucket/pathprefix/.
outputFilenamePrefix (Facoltativo) Il prefisso del nome file dei file di output scritti in Cloud Storage. Il valore predefinito è output-.
windowDuration (Facoltativo) Intervallo della finestra in cui l'output viene scritto in Cloud Storage. Il valore predefinito è 1 m (ovvero 1 minuto).
numShards (Facoltativo) Numero massimo di shard di output. Obbligatorio quando sinkType è Cloud Storage e deve essere impostato su 1 o un numero superiore.
messagesLimit (Facoltativo) Numero massimo di messaggi di output. Il valore predefinito è 0, che indica un numero illimitato.
autoscalingAlgorithm (Facoltativo) Algoritmo utilizzato per la scalabilità automatica dei worker. I valori possibili sono THROUGHPUT_BASED per attivare la scalabilità automatica o NONE per disattivarla.
maxNumWorkers (Facoltativo) Numero massimo di macchine worker. Ad esempio: 10.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the Streaming Data Generator template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: il percorso del file dello schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: il percorso del file dello schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

Passaggi successivi