Modelo do Pub/Sub para MongoDB

O modelo do Pub/Sub para MongoDB é um pipeline de streaming que lê mensagens JSON codificadas de uma assinatura do Pub/Sub e grava no MongoDB como documentos. Caso seja necessário, o pipeline é compatível com transformações extras que podem ser incluídas usando uma função definida pelo usuário (UDF) do JavaScript.

Se ocorrerem erros durante o processamento de registros, o modelo os gravará em uma tabela do BigQuery com a mensagem de entrada. Por exemplo, erros podem ocorrer devido a incompatibilidade de esquema, JSON malformado ou ao executar transformações. Especifique o nome da tabela no parâmetro deadletterTable. Se a tabela não existir, o pipeline a criará automaticamente.

Requisitos de pipeline

  • A assinatura do Pub/Sub precisa existir e as mensagens precisam ser codificadas em um formato JSON válido.
  • O cluster do MongoDB precisa existir e poder ser acessado das máquinas de trabalho do Dataflow.

Parâmetros do modelo

Parâmetros obrigatórios

  • inputSubscription: nome da assinatura do Pub/Sub Por exemplo: projects/your-project-id/subscriptions/your-subscription-name.
  • mongoDBUri: lista separada por vírgulas dos servidores MongoDB. Exemplo: host1:port,host2:port,host3:port.
  • database: banco de dados no MongoDB para armazenar a coleção. Exemplo: my-db.
  • collection: o nome da coleção no banco de dados do MongoDB. Exemplo: my-collection.
  • deadletterTable: a tabela do BigQuery que armazena mensagens causadas por falhas, como esquema incompatível, JSON incorreto e assim por diante. (Exemplo: id-do-projeto:seu-conjunto-de-dados.nome-da-tabela).

Parâmetros opcionais

  • batchSize: tamanho do lote usado para a inserção de documentos em lote no MongoDB. O padrão é: 1000.
  • batchSizeBytes: tamanho do lote em bytes. O padrão é 5242880.
  • maxConnectionIdleTime: tempo máximo de inatividade permitido em segundos antes que o tempo limite da conexão ocorra. O padrão é 60000.
  • sslEnabled: valor booleano que indica se a conexão com o MongoDB é ativada para SSL. O padrão é "true".
  • ignoreSSLCertificate: valor booleano que indica se o certificado SSL será ignorado. O padrão é "true".
  • withOrdered: valor booleano que permite inserções ordenadas em massa para o MongoDB. O padrão é "true".
  • withSSLInvalidHostNameAllowed: valor booleano que indica se um nome de host inválido é permitido para a conexão SSL. O padrão é "true".
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Exemplo: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: especifica a frequência de recarregamento da UDF em minutos Se o valor for maior que 0, o Dataflow vai verificar periodicamente o arquivo da UDF no Cloud Storage e vai atualizar a UDF se o arquivo for modificado. Com esse parâmetro, é possível atualizar a UDF enquanto o pipeline está em execução, sem precisar reiniciar o job. Se o valor for 0, o recarregamento da UDF será desativado. O valor padrão é 0.

Função definida pelo usuário

Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: uma única linha de um arquivo CSV de entrada.
  • Saída: um documento JSON em string para inserir no MongoDB.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to MongoDB template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

A seguir