Monitorizar consultas continuas

Puedes monitorizar las consultas continuas de BigQuery con las siguientes herramientas de BigQuery:

Debido a la larga duración de una consulta continua de BigQuery, las métricas que se suelen generar al completar una consulta de SQL pueden no estar presentes o ser imprecisas.

Usar INFORMATION_SCHEMA vistas

Puedes usar varias vistas de INFORMATION_SCHEMA para monitorizar las consultas continuas y las reservas de consultas continuas.

Ver detalles de un trabajo

Puedes usar la vista JOBS para obtener metadatos de trabajos de consulta continua.

La siguiente consulta devuelve los metadatos de todas las consultas continuas activas. Los metadatos incluyen la marca de tiempo de la marca de agua de salida, que representa el punto hasta el que la consulta continua ha procesado los datos correctamente.

  1. En la Google Cloud consola, ve a la página BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, ejecuta la siguiente consulta:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    Haz los cambios siguientes:

Ver los detalles de la asignación de reserva

Puede usar las vistas ASSIGNMENTS y RESERVATIONS para obtener detalles de asignación de reservas de consultas continuas.

Devuelve los detalles de asignación de reserva de las consultas continuas:

  1. En la Google Cloud consola, ve a la página BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, ejecuta la siguiente consulta:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    Haz los cambios siguientes:

    • ADMIN_PROJECT_ID: el ID del proyecto de administración que tiene la reserva.
    • LOCATION: la ubicación de la reserva.
    • PROJECT_ID: ID del proyecto asignado a la reserva. Solo se devuelve información sobre las consultas continuas que se ejecutan en este proyecto.

Ver información sobre el consumo de ranuras

Puede usar las vistas ASSIGNMENTS, RESERVATIONS y JOBS_TIMELINE para obtener información continua sobre el consumo de ranuras de consulta.

Devuelve información sobre el consumo de espacios de consultas continuas:

  1. En la Google Cloud consola, ve a la página BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, ejecuta la siguiente consulta:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    Haz los cambios siguientes:

    • ADMIN_PROJECT_ID: el ID del proyecto de administración que tiene la reserva.
    • LOCATION: la ubicación de la reserva.
    • PROJECT_ID: ID del proyecto asignado a la reserva. Solo se devuelve información sobre las consultas continuas que se ejecutan en este proyecto.

También puedes monitorizar las reservas de consultas continuas con otras herramientas, como el explorador de métricas y los gráficos de recursos administrativos. Para obtener más información, consulta Monitorizar reservas de BigQuery.

Usar el gráfico de ejecución de consultas

Puede usar el gráfico de ejecución de consultas para obtener estadísticas generales y de rendimiento de una consulta continua. Para obtener más información, consulta el artículo Ver estadísticas de rendimiento de las consultas.

Ver el historial de tareas

Puedes ver los detalles de los trabajos de consulta continua en tu historial de trabajos personal o en el historial de trabajos del proyecto. Para obtener más información, consulta Ver detalles de un trabajo.

Ten en cuenta que la lista histórica de trabajos se ordena por la hora de inicio del trabajo, por lo que es posible que las consultas continuas que se hayan estado ejecutando durante un tiempo no estén cerca del inicio de la lista.

Usar el explorador de tareas administrativas

En el explorador de tareas administrativas, filtra las tareas para mostrar las consultas continuas. Para ello, selecciona Consulta continua en el filtro Categoría de tarea.

Usa Cloud Monitoring

Puedes ver métricas específicas de las consultas continuas de BigQuery mediante Cloud Monitoring. Para obtener más información, consulta el artículo sobre cómo crear paneles de control, gráficos y alertas y lee sobre las métricas disponibles para la visualización.

Alertas sobre consultas fallidas

En lugar de comprobar periódicamente si tus consultas continuas han fallado, puede ser útil crear una alerta para que te avise si se produce un error. Una forma de hacerlo es crear una métrica basada en registros de Cloud Logging personalizada con un filtro para tus tareas y una política de alertas de Cloud Monitoring basada en esa métrica:

  1. Cuando crees una consulta continua, usa un prefijo de ID de trabajo personalizado. Varias consultas continuas pueden compartir el mismo prefijo. Por ejemplo, puedes usar el prefijo prod- para indicar una consulta de producción.
  2. En la Google Cloud consola, ve a la página Métricas basadas en registros.

    Ir a Métricas basadas en registros

  3. Haz clic en Crear métrica. Aparecerá el panel Crear métrica de registros.

  4. En Tipo de métrica, selecciona Contador.

  5. En la sección Detalles, asigne un nombre a la métrica. Por ejemplo, CUSTOM_JOB_ID_PREFIX-metric.

  6. En la sección Selección de filtro, introduce lo siguiente en el editor Crear filtro:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Haz los cambios siguientes:

  7. Haz clic en Crear métrica.

  8. En el menú de navegación, haga clic en Métricas basadas en registros. La métrica que acaba de crear aparece en la lista de métricas definidas por el usuario.

  9. En la fila de la métrica, haz clic en Más acciones y, a continuación, en Crear alerta a partir de la métrica.

  10. Haz clic en Siguiente. No es necesario que cambie la configuración predeterminada en la página Modo de configuración de la política.

  11. Haz clic en Siguiente. No es necesario que cambies la configuración predeterminada en la página Configurar activador de alerta.

  12. Selecciona los canales de notificaciones y escribe un nombre para la política de alertas.

  13. Haz clic en Crear política.

Para probar la alerta, ejecuta una consulta continua con el prefijo de ID de trabajo personalizado que has seleccionado y, a continuación, cancélala. La alerta puede tardar unos minutos en llegar a tu canal de notificaciones.

Reintentar consultas fallidas

Volver a intentar una consulta continua fallida puede ayudar a evitar situaciones en las que una canalización continua esté inactiva durante un periodo prolongado o requiera la intervención humana para reiniciarse. Estos son algunos aspectos importantes que debes tener en cuenta al reintentar una consulta continua fallida:

  • Indica si se puede volver a procesar una cantidad de datos procesados por la consulta anterior antes de que fallara.
  • Cómo gestionar la limitación de reintentos o usar un tiempo de espera exponencial.

Una posible estrategia para automatizar los reintentos de consultas es la siguiente:

  1. Crea un sumidero de Cloud Logging basado en un filtro de inclusión que cumpla los siguientes criterios para enrutar los registros a un tema de Pub/Sub:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Haz los cambios siguientes:

  2. Crea una función de Cloud Run que se active en respuesta a los registros recibidos por Pub/Sub que coincidan con tu filtro.

    La función de Cloud Run podría aceptar la carga útil de datos del mensaje de Pub/Sub e intentar iniciar una nueva consulta continua con la misma sintaxis SQL que la consulta fallida, pero justo después de que se detuviera la tarea anterior.

Por ejemplo, puedes usar una función similar a la siguiente:

Python

Antes de probar este ejemplo, sigue las Pythoninstrucciones de configuración de la guía de inicio rápido de BigQuery con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Python de BigQuery.

Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación para bibliotecas de cliente.

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

Siguientes pasos