Plantilla de suscripción de Pub/Sub a BigQuery

La plantilla de suscripción de Pub/Sub a BigQuery es un flujo de procesamiento en streaming que lee mensajes con formato JSON de una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como solución rápida para transferir datos de Pub/Sub a BigQuery. La plantilla lee mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Antes de ejecutar una canalización de Dataflow para este caso, plantéate si una suscripción de Pub/Sub a BigQuery con una UDF se ajusta a tus necesidades.

Requisitos del flujo de procesamiento

  • El campo data de los mensajes de Pub/Sub debe usar el formato JSON, que se describe en esta guía de JSON. Por ejemplo, los mensajes con valores en el campo data con el formato {"k1":"v1", "k2":"v2"} se pueden insertar en una tabla de BigQuery con dos columnas, llamadas k1 y k2, con un tipo de datos de cadena.
  • La tabla de salida debe existir antes de ejecutar el flujo de procesamiento. El esquema de la tabla debe coincidir con los objetos JSON de entrada.

Parámetros de plantilla

Parámetros obligatorios

  • outputTableSpec ubicación de la tabla de salida de BigQuery, en formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • inputSubscription suscripción de entrada de Pub/Sub desde la que se leerán los datos, con el formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>.

Parámetros opcionales

  • outputDeadletterTable la tabla de BigQuery que se va a usar para los mensajes que no se puedan enviar a la tabla de salida, en el formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Si la tabla no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa OUTPUT_TABLE_SPEC_error_records.
  • javascriptTextTransformGcsPath el URI de Cloud Storage del archivo .js que define la función de JavaScript definida por el usuario (UDF) que se va a usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName nombre de la función definida por el usuario (UDF) de JavaScript que se va a usar. Por ejemplo, si el código de la función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDFs de JavaScript, consulta Ejemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes define el intervalo en el que los workers pueden comprobar si hay cambios en las UDFs de JavaScript para volver a cargar los archivos. El valor predeterminado es 0.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una cadena JSON.
  • Salida: una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecutar la plantilla

    Consola

    1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
    2. Ir a Crear tarea a partir de plantilla
    3. En el campo Nombre de la tarea, introduce un nombre único.
    4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

      Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de flujo de datos, seleccione the Pub/Sub Subscription to BigQuery template.
    6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
    7. Opcional: Para cambiar del procesamiento una sola vez al modo de streaming al menos una vez, selecciona Al menos una vez.
    8. Haz clic en Ejecutar trabajo.

    gcloud

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    Haz los cambios siguientes:

    • JOB_NAME: un nombre de trabajo único que elijas
    • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: la versión de la plantilla que quieres usar

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: la ubicación de los archivos locales de almacenamiento provisional (por ejemplo, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: nombre de tu suscripción de Pub/Sub
    • DATASET: tu conjunto de datos de BigQuery
    • TABLE_NAME: nombre de la tabla de BigQuery

    API

    Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
    • JOB_NAME: un nombre de trabajo único que elijas
    • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: la versión de la plantilla que quieres usar

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: la ubicación de los archivos locales de almacenamiento provisional (por ejemplo, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: nombre de tu suscripción de Pub/Sub
    • DATASET: tu conjunto de datos de BigQuery
    • TABLE_NAME: nombre de la tabla de BigQuery

    Siguientes pasos