Modello Argomento Pub/Sub o Abbonamento a file di testo su Cloud Storage

Il modello Pub/Sub Topic or Subscription to Cloud Storage Text è una pipeline di streaming che legge i record da Pub/Sub e li salva sotto forma di serie di file di Cloud Storage in formato testo. Il modello può essere utilizzato come mezzo rapido per salvare dati in Pub/Sub per uso futuro. Per impostazione predefinita, il modello genera un nuovo file ogni 5 minuti.

Requisiti della pipeline

  • L'argomento o la sottoscrizione Pub/Sub deve esistere prima dell'esecuzione.
  • I messaggi pubblicati nell'argomento devono essere in formato testo.
  • I messaggi pubblicati nell'argomento non devono contenere caratteri di fine riga. Tieni presente che ogni messaggio Pub/Sub viene salvato come singola riga nel file di output.

Parametri del modello

Parametri obbligatori

  • outputDirectory: il percorso e il prefisso del nome file in cui scrivere i file di output. Questo valore deve terminare con una barra. Ad esempio, gs://your-bucket/your-path/.

Parametri facoltativi

  • inputTopic: l'argomento Pub/Sub da cui leggere l'input. Se viene fornito questo parametro, non utilizzare inputSubscription. Ad esempio: projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: la sottoscrizione Pub/Sub da cui leggere l'input. Se viene fornito questo parametro, non utilizzare inputTopic. Ad esempio: projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>.
  • userTempLocation: la directory fornita dall'utente in cui generare i file temporanei. Deve terminare con una barra.
  • outputFilenamePrefix: il prefisso da inserire in ogni file in finestra. Ad esempio, output-. Il valore predefinito è output.
  • outputFilenameSuffix: il suffisso da inserire in ogni file in finestra, in genere un'estensione di file come .txt o .csv. Ad esempio: .txt. Il valore predefinito è vuoto.
  • outputShardTemplate: il modello di shard definisce la parte dinamica di ogni file in finestra. Per impostazione predefinita, la pipeline utilizza un singolo shard per l'output nel file system all'interno di ogni finestra. Ciò significa che tutti i dati vengono inseriti in un unico file per finestra. outputShardTemplate è impostato su W-P-SS-of-NN per impostazione predefinita, dove W è l'intervallo di date della finestra, P sono le informazioni del riquadro, S è il numero di shard e N è il numero di shard. Nel caso di un singolo file, la parte SS-of-NN del outputShardTemplate è 00-of-01.
  • numShards: il numero massimo di shard di output prodotti durante la scrittura. Un numero maggiore di shard significa una velocità effettiva più elevata per la scrittura in Cloud Storage, ma potenzialmente un costo di aggregazione dei dati più elevato tra gli shard durante l'elaborazione dei file Cloud Storage di output. Il valore predefinito è 0.
  • windowDuration: la durata della finestra è l'intervallo in cui i dati vengono scritti nella directory di output. Configura la durata in base alla velocità effettiva della pipeline. Ad esempio, una velocità effettiva più elevata potrebbe richiedere dimensioni della finestra più piccole in modo che i dati rientrino nella memoria. Il valore predefinito è 5m (5 minuti), con un minimo di 1s (1 secondo). I formati consentiti sono: [int]s (per i secondi, ad esempio 5s), [int]m (per i minuti, ad esempio 12m), [int]h (per le ore, ad esempio 2h). Ad esempio, 5m.
  • yearPattern: pattern per la formattazione dell'anno. Deve essere uno o più dei seguenti valori: y o Y. Le maiuscole non fanno differenza nell'anno. Il pattern può essere racchiuso facoltativamente da caratteri non alfanumerici o dal carattere di directory (/). Il valore predefinito è YYYY.
  • monthPattern: pattern per la formattazione del mese. Deve essere uno o più caratteri M. Il pattern può essere racchiuso facoltativamente da caratteri non alfanumerici o dal carattere di directory (/). Il valore predefinito è MM.
  • dayPattern: pattern per la formattazione del giorno. Deve essere uno o più valori di d per il giorno del mese o di D per il giorno dell'anno. Le maiuscole non fanno differenza nell'anno. Il pattern può essere racchiuso facoltativamente da caratteri non alfanumerici o dal carattere di directory (/). Il valore predefinito è dd.
  • hourPattern: pattern per la formattazione dell'ora. Deve essere uno o più caratteri H. Il pattern può essere racchiuso facoltativamente da caratteri non alfanumerici o dal carattere di directory (/). Il valore predefinito è HH.
  • minutePattern: pattern per formattare i minuti. Deve essere uno o più caratteri m. Il pattern può essere racchiuso facoltativamente da caratteri non alfanumerici o dal carattere di directory (/). Il valore predefinito è mm.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Sostituisci quanto segue:

  • JOB_NAME: un nome univoco del job a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SUBSCRIPTION_NAME: il nome della tua sottoscrizione Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SUBSCRIPTION_NAME: il nome della tua sottoscrizione Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage

Passaggi successivi