Plantilla de Pub/Sub de proto a BigQuery

La plantilla de proto de Pub/Sub a BigQuery es una canalización de transmisión que transfiere datos de proto desde una suscripción a Pub/Sub hacia una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.

Se puede proporcionar una función definida por el usuario (UDF) de JavaScript para transformar los datos. Los errores mientras se ejecuta la UDF se pueden enviar a un tema de Pub/Sub separado o al mismo tema sin procesar que los errores de BigQuery.

Antes de ejecutar una canalización de Dataflow para este caso de uso, considera si una suscripción a BigQuery de Pub/Sub con una UDF cumple con tus requisitos.

Requisitos de la canalización

  • La suscripción de entrada de Pub/Sub debe existir.
  • El archivo de esquema de los registros proto debe existir en Cloud Storage.
  • El tema de Pub/Sub de salida debe existir.
  • El conjunto de datos de salida de BigQuery debe existir.
  • Si la tabla de BigQuery existe, debe tener un esquema que coincida con los datos del proto, sin importar el valor createDisposition.

Parámetros de la plantilla

Parámetros obligatorios

  • protoSchemaPath (ruta de Cloud Storage al archivo de esquema de Proto): Es la ruta de Cloud Storage a un archivo de conjunto de descriptores autónomo. Ejemplo: gs://MyBucket/schema.pb. schema.pb se puede generar agregando --descriptor_set_out=schema.pb al comando protoc que compila los protos. La marca --include_imports se puede usar para garantizar que el archivo sea autónomo.
  • fullMessageName (nombre completo del mensaje Proto): Es el nombre completo del mensaje (por ejemplo, package.name.MessageName). Si el mensaje está anidado dentro de otro mensaje, incluye todos los mensajes con el delimitador "." (por ejemplo, package.name.OuterMessage.InnerMessage). "package.name" debe provenir de la sentencia package, no de la sentencia java_package.
  • inputSubscription (suscripción de entrada de Pub/Sub): Suscripción a Pub/Sub desde la que se lee la entrada, en el formato “projects/your-project-id/subscriptions/your-subscription-name” (ejemplo: projects/your-project-id/subscriptions/your-subscription-name).
  • outputTableSpec (tabla de salida de BigQuery): Es la ubicación de la tabla de BigQuery en la que se escribirá el resultado. El nombre debe tener el formato <project>:<dataset>.<table_name>. El esquema de la tabla debe coincidir con los objetos de entrada.
  • outputTopic (tema de Pub/Sub de salida): Es el nombre del tema en el que se deben publicar los datos, en el formato “projects/your-project-id/topics/your-topic-name” (ejemplo: projects/your-project-id/topics/your-topic-name).

Parámetros opcionales

  • preserveProtoFieldNames (Conservar nombres de campos .proto): Es una marca para controlar si se deben conservar los nombres de los campos .proto o convertirlos a lowerCamelCase. Si la tabla ya existe, esto debe basarse en lo que coincida con el esquema de la tabla. De lo contrario, determinará los nombres de las columnas de la tabla creada. Es verdadero para conservar el formato snake_case de proto. Si es falso, los campos se convertirán a lowerCamelCase. (Valor predeterminado: false).
  • bigQueryTableSchemaPath (ruta de acceso al esquema de la tabla de BigQuery): Es la ruta de acceso de Cloud Storage al archivo JSON del esquema de BigQuery. Si no se configura, el esquema se infiere a partir del esquema de Proto. (Ejemplo: gs://MyBucket/bq_schema.json).
  • udfOutputTopic (tema de salida de Pub/Sub para errores de UDF): Es un tema de salida opcional para enviar errores de UDF. Si no se configura esta opción, los errores se escribirán en el mismo tema que los errores de BigQuery. (Ejemplo: projects/your-project-id/topics/your-topic-name).
  • writeDisposition (disposición de escritura que se usará para BigQuery): Es la disposición de escritura de BigQuery. Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. La configuración predeterminada es: WRITE_APPEND.
  • createDisposition (Create Disposition to use for BigQuery): Es CreateDisposition de BigQuery. Por ejemplo, CREATE_IF_NEEDED, CREATE_NEVER. La configuración predeterminada es CREATE_IF_NEEDED.
  • javascriptTextTransformGcsPath (ruta de Cloud Storage a la fuente de UDF de JavaScript): Es el patrón de ruta de acceso de Cloud Storage para el código JavaScript que contiene las funciones definidas por el usuario. (Por ejemplo: gs://your-bucket/your-function.js).
  • javascriptTextTransformFunctionName (nombre de la función de JavaScript de la UDF): Es el nombre de la función a la que se llamará desde el archivo JavaScript. Usa solo letras, dígitos y guiones bajos. (Ejemplo: “transform” o “transform_udf1”).
  • javascriptTextTransformReloadIntervalMinutes (Intervalo de recarga automática de la UDF de JavaScript [minutos]): Define el intervalo que los trabajadores pueden verificar para detectar cambios en la UDF de JavaScript a fin de volver a cargar los archivos. La configuración predeterminada es 0.
  • useStorageWriteApi (Usa la API de BigQuery Storage Write): Si es verdadero, la canalización usa la API de Storage Write cuando escribe los datos en BigQuery (consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). El valor predeterminado es falso. Cuando usas la API de Storage Write en modo “exactamente una vez”, debes establecer los siguientes parámetros: “Cantidad de transmisiones para la API de BigQuery Storage Write” y “Frecuencia de activación en segundos para la API de BigQuery Storage Write”. Si habilitas el modo de al menos una vez de Dataflow o configuras el parámetro useStorageWriteApiAtLeastOnce como verdadero, no es necesario que establezcas la cantidad de transmisiones ni la frecuencia de activación.
  • useStorageWriteApiAtLeastOnce (Usar la semántica de al menos una vez en la API de BigQuery Storage Write): Este parámetro solo se aplica si “Usar la API de BigQuery Storage Write” está habilitada. Si se habilita, se usará la semántica de “al menos una vez” para la API de Storage Write; de lo contrario, se usará la semántica de “exactamente una vez”. La configuración predeterminada es "false".
  • numStorageWriteApiStreams (cantidad de transmisiones para la API de BigQuery Storage Write): La cantidad de transmisiones define el paralelismo de la transformación Write de BigQueryIO y corresponde, más o menos, a la cantidad de transmisiones de la API de Storage Write que usará la canalización. Consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para conocer los valores recomendados. La configuración predeterminada es 0.
  • storageWriteApiTriggeringFrequencySec (frecuencia de activación en segundos para la API de BigQuery Storage Write): La frecuencia de activación determinará qué tan pronto serán visibles los datos para las consultas en BigQuery. Consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para conocer los valores recomendados.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una cadena JSON
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecuta la plantilla

    Console

    1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
    2. Ir a Crear un trabajo a partir de una plantilla
    3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
    4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

      Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Proto to BigQuery template.
    6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
    7. Haga clic en Ejecutar trabajo.

    gcloud

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Reemplaza lo siguiente:

    • JOB_NAME: Es el nombre del trabajo que elijas
    • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo, gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Es el nombre del mensaje Proto (por ejemplo, package.name.MessageName).
    • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
    • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
    • UNPROCESSED_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.

    API

    Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto Google Cloud en el que deseas ejecutar el trabajo de Dataflow
    • JOB_NAME: Es el nombre del trabajo que elijas
    • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo, gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Es el nombre del mensaje Proto (por ejemplo, package.name.MessageName).
    • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
    • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
    • UNPROCESSED_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.

    ¿Qué sigue?