Gerenciar streams

Nesta página, você vai aprender a usar a API Datastream para:

  • Criar streams
  • Receber informações sobre fluxos e objetos de fluxos
  • Atualize os fluxos iniciando, pausando, retomando e modificando-os, bem como iniciando e interrompendo o preenchimento de objetos de fluxo
  • Recuperar streams com falha permanente
  • Ativar o streaming de objetos grandes para streams do Oracle
  • Excluir fluxos

Há duas maneiras de usar a API Datastream. É possível fazer chamadas da API REST ou usar a Google Cloud CLI (CLI).

Para informações gerais sobre como usar o Google Cloud CLI para gerenciar fluxos do Datastream, consulte fluxos do Datastream da CLI gcloud.

Criar um stream

Nesta seção, você vai aprender a criar um stream usado para transferir dados da origem para um destino. Os exemplos a seguir não são abrangentes, mas destacam recursos específicos do Datastream. Para resolver seu caso de uso específico, use estes exemplos com a documentação de referência da API do Datastream.

Esta seção aborda os seguintes casos de uso:

Exemplo 1: transmitir objetos específicos para o BigQuery

Neste exemplo, você vai aprender a:

  • Fazer streaming do MySQL para o BigQuery
  • Incluir um conjunto de objetos no fluxo
  • Definir o modo de gravação do stream como somente anexação
  • Fazer o preenchimento de todos os objetos incluídos no fluxo

Veja a seguir uma solicitação para extrair todas as tabelas de schema1 e duas tabelas específicas de schema2: tableA e tableC. Os eventos são gravados em um conjunto de dados no BigQuery.

A solicitação não inclui o parâmetro customerManagedEncryptionKey. Portanto, o sistema interno de gerenciamento de chaves Google Cloud é usado para criptografar seus dados em vez da CMEK.

O parâmetro backfillAll associado à execução do preenchimento histórico (ou snapshot) é definido como um dicionário vazio ({}), o que significa que o Datastream preenche os dados históricos de todas as tabelas incluídas no fluxo.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Para mais informações sobre como usar gcloud para criar um fluxo, consulte a documentação do SDK Google Cloud.

Exemplo 2: excluir objetos específicos de um fluxo com uma origem do PostgreSQL

Neste exemplo, você vai aprender a:

  • Fazer streaming do PostgreSQL para o BigQuery
  • Excluir objetos do fluxo
  • Excluir objetos do preenchimento

O código a seguir mostra uma solicitação para criar um stream usado para transferir dados de um banco de dados PostgreSQL de origem para o BigQuery. Ao criar um stream de um banco de dados de origem do PostgreSQL, é necessário especificar dois campos adicionais específicos do PostgreSQL na solicitação:

  • replicationSlot: um slot de replicação é um pré-requisito para configurar um banco de dados do PostgreSQL para replicação. É necessário criar um slot de replicação para cada fluxo.
  • publication: uma publicação é um grupo de tabelas de que você quer replicar as mudanças. O nome da publicação precisa existir no banco de dados antes de iniciar um stream. No mínimo, a publicação precisa incluir as tabelas especificadas na lista includeObjects do stream.

O parâmetro backfillAll associado à execução do preenchimento histórico (ou snapshot) está definido para excluir uma tabela.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Para mais informações sobre como usar gcloud para criar um fluxo, consulte a documentação do SDK Google Cloud.

Exemplo 3: especificar o modo de gravação somente de anexação para um stream

Ao fazer streaming para o BigQuery, é possível definir o modo de gravação: merge ou appendOnly. Para mais informações, consulte Configurar o modo de gravação.

Se você não especificar o modo de gravação na solicitação para criar um stream, o modo padrão merge será usado.

A solicitação a seguir mostra como definir o modo appendOnly ao criar um fluxo do MySQL para o BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Para mais informações sobre como usar gcloud para criar um fluxo, consulte a documentação do SDK Google Cloud.

Exemplo 4: transmitir para um destino do Cloud Storage

Neste exemplo, você vai aprender a:

  • Fazer streaming do Oracle para o Cloud Storage
  • Definir um conjunto de objetos para incluir no stream
  • Definir a CMEK para criptografar dados em repouso

A solicitação a seguir mostra como criar um stream que grava os eventos em um bucket no Cloud Storage.

Neste exemplo de solicitação, os eventos são gravados no formato de saída JSON, e um novo arquivo é criado a cada 100 MB ou 30 segundos (substituindo os valores padrão de 50 MB e 60 segundos).

Para o formato JSON, é possível:

  • Inclua um arquivo de esquema de tipos unificados no caminho. Como resultado, o Datastream grava dois arquivos no Cloud Storage: um arquivo de dados JSON e um arquivo de esquema Avro. O arquivo de esquema tem o mesmo nome do arquivo de dados, com uma extensão .schema.

  • Ative a compactação gzip para que o Datastream compacte os arquivos gravados no Cloud Storage.

Ao usar o parâmetro backfillNone, a solicitação especifica que apenas as mudanças em andamento são transmitidas para o destino, sem preenchimento retroativo.

A solicitação especifica o parâmetro de chave de criptografia gerenciada pelo cliente, que permite controlar as chaves usadas para criptografar dados em repouso em um projeto Google Cloud . O parâmetro se refere à CMEK que o Datastream usa para criptografar os dados transmitidos da origem para o destino. Ele também especifica o keyring da CMEK.

Para mais informações sobre keyrings, consulte Recursos do Cloud KMS. Para mais informações sobre como proteger seus dados usando chaves de criptografia, consulte o Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Para mais informações sobre como usar gcloud para criar um fluxo, consulte a documentação do SDK Google Cloud.

Exemplo 5: fazer streaming para uma tabela gerenciada do BigLake

Neste exemplo, você vai aprender a configurar um stream para replicar dados de um banco de dados MySQL para uma tabela do BigLake Iceberg no modo append-only. Antes de criar a solicitação, verifique se você concluiu as etapas a seguir:

  • Tenha um bucket do Cloud Storage em que você quer armazenar seus dados
  • Criar uma conexão de recursos do Cloud
  • Conceder à sua conexão de recursos do Cloud acesso ao bucket do Cloud Storage

Em seguida, use a solicitação a seguir para criar seu stream:

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream
{
  "displayName": "MySQL to BigLake stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          {
            "database": "my-mysql-database"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id",
    "bigqueryDestinationConfig": {
      "blmtConfig": {
        "bucket": "my-gcs-bucket-name",
        "rootPath": "my/folder",
        "connectionName": "my-project-id.us-central1.my-bigquery-connection-name",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG"
        },
      "singleTargetDataset": {
        "datasetId": "my-project-id:my-bigquery-dataset-id"
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

datastream streams create mysqlBigLakeStream --location=us-central1
--display-name=mysql-to-bl-stream --source=source --mysql-source-config=mysql_source_config.json
--destination=destination --bigquery-destination-config=bl_config.json
--backfill-none

O conteúdo do arquivo de configuração de origem mysql_source_config.json:

{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{"database":"my-mysql-database"}]}}

Conteúdo do arquivo de configuração bl_config.json:

{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }

Terraform

resource "google_datastream_stream" "stream" {
  stream_id    = "mysqlBlStream"
  location     = "us-central1"
  display_name = "MySQL to BigLake stream"

  source_config {
    source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp"
    mysql_source_config {
      include_objects {
        mysql_databases {
          database = "my-mysql-database"
        }
      }
    }
  }

  destination_config {
    destination_connection_profile = "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id"
    bigquery_destination_config {
      single_target_dataset {
        dataset_id = "my-project-id:my-bigquery-dataset-id"
      }
      blmt_config {
        bucket          = "my-gcs-bucket-name"
        table_format    = "ICEBERG"
        file_format     = "PARQUET"
        connection_name = "my-project-id.us-central1.my-bigquery-connection-name"
        root_path       = "my/folder"
      }
      append_only {}
    }
  }

  backfill_none {}
}
    

Validar a definição de um stream

Antes de criar um fluxo, é possível validar a definição dele. Dessa forma, é possível garantir que todas as verificações de validação sejam aprovadas e que o stream seja executado com êxito quando criado.

Como validar verificações de stream:

  • Indica se a fonte está configurada corretamente para permitir que o Datastream faça streaming de dados.
  • Se a transmissão pode se conectar à origem e ao destino.
  • A configuração completa do stream.

Para validar um fluxo, adicione &validate_only=true ao URL antes do corpo da solicitação:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Depois de fazer essa solicitação, você verá as verificações de validação que o Datastream executa para sua origem e seu destino e se as verificações serão aprovadas ou falharão. Para qualquer verificação de validação que não seja aprovada, serão exibidas informações sobre o motivo da falha e o que fazer para corrigir o problema.

Por exemplo, suponha que você tenha uma chave de criptografia gerenciada pelo cliente (CMEK) que quer que o Datastream use para criptografar dados transmitidos da origem para o destino. Como parte da validação do stream, o Datastream verifica se a chave existe e se ele tem permissões para usá-la. Se uma dessas condições não for atendida, ao validar o stream, a seguinte mensagem de erro será retornada:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Para resolver esse problema, verifique se a chave fornecida existe e se a conta de serviço do Datastream tem a permissão cloudkms.cryptoKeys.get para a chave.

Depois de fazer as correções apropriadas, faça a solicitação novamente para garantir que todas as verificações de validação sejam aprovadas. No exemplo anterior, a verificação CMEK_VALIDATE_PERMISSIONS não vai mais retornar uma mensagem de erro, mas terá o status PASSED.

Receber informações sobre um fluxo

O código a seguir mostra uma solicitação para recuperar informações sobre um stream. Exemplos dessas informações:

  • O nome do fluxo (identificador exclusivo)
  • Um nome fácil de usar para a transmissão (nome de exibição)
  • Carimbos de data/hora de quando o stream foi criado e atualizado pela última vez
  • Informações sobre os perfis de conexão de origem e de destino associados ao stream
  • O estado do stream

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

A resposta é exibida da seguinte maneira:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Para mais informações sobre como usar gcloud para recuperar informações sobre seu fluxo, consulte a documentação do SDK Google Cloud.

Listar fluxos

O código a seguir mostra uma solicitação para recuperar uma lista de todos os streams no projeto e local especificados.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Para mais informações sobre como usar gcloud para recuperar informações sobre todos os seus fluxos, consulte a documentação do SDK Google Cloud.

Listar objetos de um fluxo

O código a seguir mostra uma solicitação para recuperar informações sobre todos os objetos de um stream.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Para mais informações sobre como usar gcloud para recuperar informações sobre todos os objetos do seu stream, consulte a documentação do SDK Google Cloud.

A lista de objetos retornada pode ser semelhante a esta:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Para mais informações sobre como usar gcloud para listar objetos de um fluxo, consulte a documentação do SDK Google Cloud.

Iniciar um stream

O código a seguir mostra uma solicitação para iniciar um fluxo.

Usando o parâmetro updateMask na solicitação, apenas os campos que você especificar precisam ser incluídos no corpo da solicitação. Para iniciar um stream, mude o valor no campo state de CREATED para RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Para mais informações sobre como usar gcloud para iniciar seu stream, consulte a documentação do SDK Google Cloud.

Pausar um stream

O código a seguir mostra uma solicitação para pausar um fluxo em execução.

Neste exemplo, o campo especificado no parâmetro updateMask é state. Ao pausar o stream, você muda o estado dele de RUNNING para PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Para mais informações sobre como usar gcloud para pausar seu stream, consulte a documentação do SDK Google Cloud.

Retomar um stream

O código a seguir mostra uma solicitação para retomar um stream pausado.

Neste exemplo, o campo especificado no parâmetro updateMask é state. Ao retomar o fluxo, você muda o estado dele de PAUSED de volta para RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Para mais informações sobre como usar gcloud para retomar o stream, consulte a documentação do SDK Google Cloud.

Recuperar um stream

É possível recuperar uma transmissão que falhou permanentemente usando o método RunStream. Cada tipo de banco de dados de origem tem uma definição própria de quais operações de recuperação de fluxo são possíveis. Para mais informações, consulte Recuperar um stream.

Recuperar um stream para uma origem MySQL ou Oracle

Os exemplos de código a seguir mostram solicitações para recuperar um stream de uma origem MySQL ou Oracle de várias posições de arquivo de registro:

REST

Recupera um stream da posição atual. Esta é a opção padrão:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Recuperar um stream da próxima posição disponível:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Recuperar um stream da posição mais recente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Recuperar um stream de uma posição específica (replicação baseada em binlog do MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Substitua:

  • NAME_OF_THE_LOG_FILE: o nome do arquivo de registro de que você quer recuperar o stream.
  • POSITION: a posição no arquivo de registro de onde você quer recuperar seu stream. Se você não fornecer o valor, o Datastream vai recuperar o fluxo do início do arquivo.

Exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Recuperar um stream de uma posição específica (replicação baseada em GTID do MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Substitua GTID_SET por um ou mais GTIDs únicos ou intervalos de GTIDs de que você quer recuperar seu fluxo.

Exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3"
      }
    }
  }
}

Recuperar um stream de uma posição específica (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Substitua scn pelo número de alteração do sistema (SCN) no arquivo de registro de refazer de onde você quer recuperar seu fluxo. Este campo é obrigatório.

Exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Para mais informações sobre as opções de recuperação disponíveis, consulte Recuperar uma transmissão.

gcloud

Não é possível recuperar um fluxo usando gcloud.

Recuperar um fluxo de uma origem do PostgreSQL

O exemplo de código a seguir mostra uma solicitação para recuperar um stream de uma fonte do PostgreSQL. Durante a recuperação, o stream começa a ler o primeiro número de sequência de registro (LSN) no slot de replicação configurado para ele.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Se você quiser mudar o slot de replicação, atualize o fluxo com o novo nome do slot primeiro:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Não é possível recuperar um fluxo usando gcloud.

Recuperar um fluxo de uma fonte do SQL Server

Os exemplos de código a seguir mostram solicitações de exemplo para recuperar um stream de uma fonte do SQL Server.

REST

Recuperar um stream da primeira posição disponível:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

Recuperar um stream de um número de sequência de registro preferido:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

Exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

Não é possível recuperar um fluxo usando gcloud.

Iniciar ou retomar uma transmissão de uma posição específica

É possível iniciar ou retomar um stream pausado de uma posição específica para fontes MySQL e Oracle. Isso pode ser útil quando você quer fazer um backfill usando uma ferramenta externa ou iniciar o CDC de uma posição indicada. Para uma fonte do MySQL, é necessário indicar uma posição de binlog ou um conjunto de GTID. Para uma fonte do Oracle, um número de alteração do sistema (SCN) no arquivo de redo log.

O código a seguir mostra uma solicitação para iniciar ou retomar um stream já criado de uma posição específica.

Iniciar ou retomar um stream de uma posição específica do binlog (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Substitua:

  • NAME_OF_THE_LOG_FILE: o nome do arquivo de registro de que você quer iniciar o stream.
  • POSITION: a posição no arquivo de registro de onde você quer iniciar o stream. Se você não fornecer o valor, o Datastream começará a ler do início do arquivo.

Exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

Não é possível iniciar ou retomar um stream de uma posição específica usando gcloud. Para informações sobre como usar gcloud para iniciar ou retomar um fluxo, consulte a documentação do SDK Cloud.

Iniciar ou retomar um fluxo de um conjunto de GTIDs específico (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Substitua GTID_SET por um ou mais GTIDs únicos ou intervalos de GTIDs de onde você quer iniciar ou retomar seu fluxo.

Exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7"
      }
    }
  }
}

gcloud

Não é possível iniciar ou retomar um stream de uma posição específica usando gcloud. Para informações sobre como usar gcloud para iniciar ou retomar um fluxo, consulte a documentação do SDK Cloud.

Inicie ou retome um stream de um número de alteração do sistema específico no arquivo de registro de refazer (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Substitua scn pelo número de alteração do sistema (SCN) no arquivo de registro de refazer de onde você quer iniciar seu fluxo. Este campo é obrigatório.

Exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

Não é possível iniciar ou retomar um stream de uma posição específica usando gcloud. Para informações sobre como usar gcloud para iniciar um stream, consulte a documentação do SDK Cloud.

Modificar um stream

O código a seguir mostra uma solicitação para atualizar a configuração de rotação de arquivos de um fluxo para alterná-lo a cada 75 MB ou 45 segundos.

Neste exemplo, os campos especificados para o parâmetro updateMask incluem os campos fileRotationMb e fileRotationInterval, representados pelas sinalizações destinationConfig.gcsDestinationConfig.fileRotationMb e destinationConfig.gcsDestinationConfig.fileRotationInterval, respectivamente.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

O código a seguir mostra uma solicitação para incluir um arquivo de esquema de tipos unificados no caminho dos arquivos que o Datastream grava no Cloud Storage. Como resultado, o Datastream grava dois arquivos: um arquivo de dados JSON e um arquivo de esquema Avro.

Neste exemplo, o campo especificado é jsonFileFormat, representado pela sinalização destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

O código a seguir mostra uma solicitação para que o Datastream replique dados existentes, além de alterações contínuas nos dados, do banco de dados de origem para o destino.

A seção oracleExcludedObjects do código mostra as tabelas e esquemas que não podem ser preenchidos no destino.

Neste exemplo, todas as tabelas e esquemas serão preenchidos, exceto a tableA no schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Para mais informações sobre como usar gcloud para modificar seu stream, consulte a documentação do SDK Google Cloud.

Iniciar o preenchimento de um objeto de um fluxo

Um stream no Datastream pode preencher dados históricos e fazer streaming de alterações em andamento para um destino. As alterações em andamento serão sempre transmitidas de uma origem para um destino. No entanto, é possível especificar se você quer que os dados históricos sejam transmitidos.

Se você quiser que os dados históricos sejam transmitidos da origem para o destino, use o parâmetro backfillAll.

O Datastream também permite transmitir dados históricos apenas para tabelas de banco de dados específicas. Para fazer isso, use o parâmetro backfillAll e exclua as tabelas para as quais você não quer dados históricos.

Se você quiser que apenas as alterações em andamento sejam transmitidas no destino, use o parâmetro backfillNone. Se você quiser que o Datastream transmita um snapshot de todos os dados existentes da origem para o destino, inicie o preenchimento manualmente para os objetos que contêm esses dados.

Outro motivo para iniciar o preenchimento de um objeto é se os dados não estiverem sincronizados entre a origem e o destino. Por exemplo, um usuário pode excluir dados no destino acidentalmente, e os dados serão perdidos. Nesse caso, o preenchimento do objeto será iniciado como um "mecanismo de redefinição", porque todos os dados são transmitidos para o destino em uma única captura. Como resultado, os dados são sincronizados entre a origem e o destino.

Antes de iniciar o preenchimento de um objeto de um stream, você precisa recuperar informações sobre o objeto.

Cada objeto tem um OBJECT_ID, que identifica o objeto de maneira exclusiva. Use o OBJECT_ID para iniciar o preenchimento do stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Para mais informações sobre como usar gcloud para iniciar o backfill de um objeto do seu stream, consulte a documentação do SDK Google Cloud.

Interromper o preenchimento de um objeto de um fluxo

Depois de iniciar o preenchimento de um objeto de um stream, você pode interromper o preenchimento do objeto. Por exemplo, se um usuário modificar um esquema de banco de dados, o esquema ou os dados poderão estar corrompidos. Você não quer que o esquema ou os dados sejam transmitidos para o destino, então o preenchimento do objeto é interrompido.

Também é possível interromper o preenchimento de um objeto para fins de balanceamento de carga. O Datastream pode executar vários preenchimentos em paralelo. Isso pode sobrecarregar a origem. Se a carga for significativa, interrompa o preenchimento de cada objeto e, em seguida, inicie o preenchimento para os objetos, um por um.

Antes de interromper o preenchimento de um objeto de um stream, é preciso fazer uma solicitação para recuperar informações sobre todos os objetos de um stream. Cada objeto retornado tem um OBJECT_ID, que identifica o objeto de maneira exclusiva. Use o OBJECT_ID para interromper o preenchimento do stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Para mais informações sobre como usar gcloud para interromper o backfill de um objeto do seu stream, consulte a documentação do SDK Google Cloud.

Mudar o número máximo de tarefas simultâneas do CDC

O código a seguir mostra como definir o número máximo de tarefas simultâneas de captura de dados alterados (CDC) para um stream do MySQL como 7.

Neste exemplo, o campo especificado no parâmetro updateMask é maxConcurrentCdcTasks. Ao definir o valor como 7, você muda o número máximo de tarefas simultâneas de CDC do valor anterior para 7. Você pode usar valores de 0 a 50 (inclusive). Se você não definir o valor ou se definir como 0, o padrão do sistema de cinco tarefas será definido para o fluxo.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Para mais informações sobre como usar gcloud, consulte a documentação do SDK Google Cloud.

Mudar o número máximo de tarefas de preenchimento simultâneas

O código a seguir mostra como definir o número máximo de tarefas simultâneas de backfill para um fluxo do MySQL como 25.

Neste exemplo, o campo especificado no parâmetro updateMask é maxConcurrentBackfillTasks. Ao definir o valor como 25, você muda o número máximo de tarefas simultâneas de backfill do valor anterior para 25. Você pode usar valores de 0 a 50 (inclusive). Se você não definir o valor ou se definir como 0, o padrão do sistema de 16 tarefas será definido para o stream.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Para mais informações sobre como usar gcloud, consulte a documentação do SDK Google Cloud.

Ativar o streaming de objetos grandes para fontes do Oracle

É possível ativar o streaming de objetos grandes, como objetos grandes binários (BLOB), objetos grandes de caracteres (CLOB) e objetos grandes de caracteres nacionais (NCLOB) para streams com fontes do Oracle. A flag streamLargeObjects permite incluir objetos grandes em fluxos novos e atuais. A flag é definida no nível do fluxo, e não é necessário especificar as colunas de tipos de dados de objetos grandes.

O exemplo a seguir mostra como criar um fluxo que permite transmitir objetos grandes.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Para mais informações sobre como usar gcloud para atualizar um fluxo, consulte a documentação do SDK Google Cloud.

Excluir um stream

O código a seguir mostra uma solicitação para excluir um stream.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Para mais informações sobre como usar gcloud para excluir seu stream, consulte a documentação do SDK Google Cloud.

A seguir