Fluxos de alterações do Spanner para o modelo do BigQuery

O modelo de fluxos de alterações do Spanner para BigQuery é um pipeline de streaming que transmite os registros de alteração de dados do Spanner e os grava em tabelas do BigQuery usando o Dataflow Runner V2.

Todas as colunas monitoradas do fluxo de alterações são incluídas em cada linha da tabela do BigQuery, independentemente de serem modificadas por uma transação do Spanner. As colunas não monitoradas não são incluídas na linha do BigQuery. Qualquer alteração do Spanner menor que a marca d'água do Dataflow é aplicada com êxito às tabelas do BigQuery ou armazenada na fila de mensagens inativas para nova tentativa. As linhas do BigQuery são inseridas fora de ordem em comparação com a ordem original do carimbo de data/hora de confirmação do Spanner.

Se as tabelas do BigQuery necessárias não existirem, o pipeline as criará. Caso contrário, as tabelas atuais do BigQuery serão usadas. O esquema das tabelas do BigQuery atuais precisa conter as colunas rastreadas correspondentes das tabelas do Spanner e qualquer outra coluna de metadados que não é ignorada explicitamente pela opção ignoreFields. Veja a descrição dos campos de metadados na lista a seguir. Cada nova linha do BigQuery inclui todas as colunas monitoradas pelo fluxo de alterações na linha correspondente na tabela do Spanner no carimbo de data/hora do registro de alterações.

Os seguintes campos de metadados são adicionados às tabelas do BigQuery. Para mais detalhes sobre esses campos, consulte Registros de alteração de dados em "Partições, registros e consultas de fluxos de alterações".

Ao usar esse modelo, lembre-se dos seguintes detalhes:

  • Esse modelo não propaga alterações de esquema do Spanner para o BigQuery. Como uma alteração de esquema no Spanner provavelmente vai interromper o pipeline, talvez ele precise ser recriado depois que o esquema for alterado.
  • Para os tipos de captura de valor OLD_AND_NEW_VALUES e NEW_VALUES, quando o registro de alteração de dados tiver uma alteração UPDATE, o modelo precisará fazer uma leitura desatualizada para o Spanner no carimbo de data/hora de confirmação do registro de alteração de dados. Isso serve para recuperar as colunas inalteradas, mas monitoradas. Configure o banco de dados "version_retention_period" corretamente para a leitura desatualizada. Para o tipo de captura de valor NEW_ROW, o modelo será mais eficiente, porque o registro de alteração de dados captura a nova linha completa, incluindo colunas que não são atualizadas em solicitações UPDATE, e o modelo não precisa fazer uma leitura desatualizada.
  • Para minimizar a latência e os custos de transporte da rede, execute o job do Dataflow na mesma região que a instância do Spanner ou as tabelas do BigQuery. Se você usar fontes, coletores, locais de arquivos de preparo ou de arquivos temporários localizados fora da região do job, seus dados poderão ser enviados entre regiões. Para mais informações, consulte Regiões do Dataflow.
  • Esse modelo é compatível com todos os tipos de dados válidos do Spanner. Se o tipo do BigQuery for mais preciso que o do Spanner, talvez ocorra perda de precisão durante a transformação. Especificamente:
    • Para o tipo JSON do Spanner, a ordem dos membros de um objeto é ordenada lexicograficamente, mas não há essa garantia para o tipo JSON do BigQuery.
    • O Spanner é compatível com o tipo TIMESTAMP de nanossegundos, mas o BigQuery é compatível apenas com o tipo TIMESTAMP de microssegundos.
  • Este modelo não oferece suporte ao uso da API BigQuery Storage Write no modo "exatamente uma vez".

Saiba mais sobre fluxos de alterações, como criar pipelines de mudança no pipeline do Dataflow e práticas recomendadas.

Requisitos de pipeline

  • A instância do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Spanner precisa ser criado antes da execução do pipeline.
  • A instância de metadados do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Spanner precisa ser criado antes da execução do pipeline.
  • O conjunto de dados do BigQuery precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetros obrigatórios

  • spannerInstanceId: a instância do Spanner em que os fluxos de alterações serão lidos.
  • spannerDatabase: o banco de dados do Spanner de onde os fluxos de alterações serão lidos.
  • spannerMetadataInstanceId: a instância do Spanner a ser usada para a tabela de metadados do conector dos fluxos de alterações.
  • spannerMetadataDatabase: o banco de dados do Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
  • spannerChangeStreamName: o nome do fluxo de alterações a ser lido pelo Spanner.
  • bigQueryDataset: o conjunto de dados do BigQuery usado para a saída de fluxos de alterações.

Parâmetros opcionais

  • spannerProjectId: o projeto do qual os fluxos de alterações serão lidos. Esse valor também é o projeto em que a tabela de metadados do conector dos fluxos de alterações é criada. O valor padrão desse parâmetro é o projeto em que o pipeline do Dataflow está em execução.
  • spannerDatabaseRole : o papel do banco de dados do Spanner a ser usado ao executar o modelo. Esse parâmetro é necessário somente quando o principal do IAM que executa o modelo é um usuário de controle de acesso minucioso. A função de banco de dados precisa ter o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função de leitura do fluxo de alterações. Para mais informações, consulte "Controle de acesso granular para fluxos de alteração" (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: o nome da tabela de metadados do conector dos fluxos de alterações do Spanner a ser usado. Se não for informado, uma tabela de metadados do conector de streams de alteração do Spanner será criada automaticamente durante o fluxo do pipeline. Você precisa fornecer esse parâmetro ao atualizar um pipeline atual. Caso contrário, não forneça esse parâmetro.
  • rpcPriority : a prioridade de solicitação das chamadas do Spanner. O valor precisa ser um destes valores: HIGH, MEDIUM ou LOW. O valor padrão é HIGH.
  • spannerHost: opcional: o endpoint do Cloud Spanner para chamar no modelo. Usado apenas para testes. Por exemplo: https://batch-spanner.googleapis.com.
  • startTimestamp : o DateTime inicial (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, a ser usado em fluxos de alterações de leitura. Ex-2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora em que o pipeline é iniciado, ou seja, o horário atual.
  • endTimestamp : o DateTime final (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, para uso em fluxos de alterações de leitura.Ex-2021- 10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
  • bigQueryProjectId: o projeto do BigQuery. O valor padrão é o projeto do job do Dataflow.
  • bigQueryChangelogTableNameTemplate : o modelo do nome da tabela do BigQuery que contém o registro de alterações. Vai para o padrão: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : o caminho para armazenar registros não processados. O caminho padrão é um diretório no local temporário do job do Dataflow. O valor padrão geralmente é suficiente.
  • dlqRetryMinutes: número de minutos entre novas tentativas de fila de mensagens inativas (DLQ). O valor padrão é 10.
  • ignoreFields : uma lista de campos separados por vírgulas (diferencia maiúsculas de minúsculas) a ser ignorados. Esses campos podem ser campos de tabelas monitoradas ou campos de metadados adicionados pelo pipeline. Campos ignorados não são inseridos no BigQuery. Quando você ignora o campo _metadata_spanner_table_name, o parâmetro bigQueryChangelogTableNameTemplate também é ignorado. O padrão é vazio.
  • disableDlqRetries: se as tentativas do DLQ serão desativadas ou não. O padrão é: falso.
  • useStorageWriteApi : se verdadeiro, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : ao usar a API Storage Write, especifica a semântica de gravação. Para usar pelo menos uma semântica (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina esse parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
  • numStorageWriteApiStreams : ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro. Padrão: 0.
  • storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: banco de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Spanner
  • BIGQUERY_DATASET: o conjunto de dados do BigQuery usado para a saída de fluxos de alterações

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: banco de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Spanner
  • BIGQUERY_DATASET: o conjunto de dados do BigQuery usado para a saída de fluxos de alterações

A seguir