Modello di flussi di modifiche di Spanner a Pub/Sub

Il modello di modifiche in tempo reale Spanner a Pub/Sub è una pipeline di streaming che trasmette i record delle modifiche dei dati Spanner e li scrive negli argomenti Pub/Sub utilizzando Dataflow Runner V2.

Per inviare i dati a un nuovo argomento Pub/Sub, devi prima creare l'argomento. Dopo la creazione, Pub/Sub genera e collega automaticamente una sottoscrizione al nuovo argomento. Se provi a inviare dati a un argomento Pub/Sub inesistente, la pipeline Dataflow genera un'eccezione e si blocca perché tenta continuamente di stabilire una connessione.

Se l'argomento Pub/Sub necessario esiste già, puoi inviare i dati a quell'argomento.

Per saperne di più, consulta Informazioni sui flussi di modifiche, Creare connessioni ai modifiche in tempo reale con Dataflow, e Best practice per i flussi di modifiche.

Requisiti della pipeline

  • L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database Spanner deve esistere prima dell'esecuzione della pipeline.
  • L'istanza dei metadati di Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Lo stream di modifiche Spanner deve esistere prima dell'esecuzione della pipeline.
  • L'argomento Pub/Sub deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametri obbligatori

  • spannerInstanceId: l'istanza Spanner da cui leggere modifiche in tempo reale.
  • spannerDatabase: il database Spanner da cui leggere modifiche in tempo reale.
  • spannerMetadataInstanceId: l'istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerMetadataDatabase: il database Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerChangeStreamName: il nome della modifica in tempo reale di Spanner da cui leggere.
  • pubsubTopic: l'argomento Pub/Sub per l'output modifiche in tempo reale.

Parametri facoltativi

  • spannerProjectId: il progetto da cui leggere modifiche in tempo reale. In questo progetto viene creata anche la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito di questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • spannerDatabaseRole: il ruolo database di Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo quando l'entità IAM che esegue il modello è un utente con controllo dell'accesso dell'accesso granulare. Il ruolo del database deve disporre del privilegio SELECT sul flusso di modifiche e del privilegio EXECUTE sulla funzione di lettura del flusso di modifiche. Per saperne di più, consulta Controllo dell'accesso granulare per gli modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: il nome della tabella dei metadati del connettore Spanner modifiche in tempo reale da utilizzare. Se non viene fornita, Spanner crea automaticamente la tabella dei metadati del connettore di flussi durante la modifica del flusso della pipeline. Devi fornire questo parametro quando aggiorni una pipeline esistente. Non utilizzare questo parametro per altri casi.
  • startTimestamp: la data e l'ora di inizio (https://tools.ietf.org/html/rfc3339), inclusa, da utilizzare per la lettura dei modifiche in tempo reale. Ad esempio, ex- 2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp dell'inizio della pipeline, ovvero l'ora attuale.
  • endTimestamp: la data e l'ora di fine (https://tools.ietf.org/html/rfc3339), inclusa, da utilizzare per la lettura dei modifiche in tempo reale. Ad esempio, ex- 2021-10-12T07:20:50.52Z. Il valore predefinito è un tempo infinito nel futuro.
  • spannerHost: l'endpoint Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Ad esempio, https://spanner.googleapis.com. Il valore predefinito è https://spanner.googleapis.com.
  • outputDataFormat: il formato dell'output. L'output viene racchiuso in molti PubsubMessage e inviato a un argomento Pub/Sub. I formati consentiti sono JSON e AVRO. Il valore predefinito è JSON.
  • pubsubAPI: l'API Pub/Sub utilizzata per implementare la pipeline. Le API consentite sono pubsubio e native_client. Per un numero ridotto di query al secondo (QPS), native_client ha una latenza inferiore. Per un numero elevato di QPS, pubsubio offre prestazioni migliori e più stabili. Il valore predefinito è pubsubio.
  • pubsubProjectId: progetto dell'argomento Pub/Sub. Il valore predefinito di questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • rpcPriority: la priorità della richiesta per le chiamate Spanner. I valori consentiti sono HIGH, MEDIUM e LOW. Il valore predefinito è HIGH.
  • includeSpannerSource: indica se includere o meno l'ID database Spanner e l'ID istanza per leggere lo stream delle modifiche nei dati del messaggio di output. Il valore predefinito è false.
  • outputMessageMetadata: il valore stringa per il campo personalizzato outputMessageMetadata nel messaggio Pub/Sub di output. Il valore predefinito è vuoto e il campo outputMessageMetadata viene compilato solo se questo valore non è vuoto. Quando inserisci il valore qui, utilizza il codice di escape per i caratteri speciali(ad es. le virgolette doppie).

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the Cloud Spanner change streams to Pub/Sub template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Sostituisci quanto segue:

  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza dei metadati Spanner
  • SPANNER_METADATA_DATABASE: Database dei metadati di Spanner
  • SPANNER_CHANGE_STREAM: Modifica in tempo reale di Spanner
  • PUBSUB_TOPIC: L'argomento Pub/Sub per l'output degli modifiche in tempo reale

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza dei metadati Spanner
  • SPANNER_METADATA_DATABASE: Database dei metadati di Spanner
  • SPANNER_CHANGE_STREAM: Modifica in tempo reale di Spanner
  • PUBSUB_TOPIC: L'argomento Pub/Sub per l'output degli modifiche in tempo reale

Passaggi successivi