Usa Ray en Vertex AI con BigQuery

Cuando ejecutas una aplicación de Ray en Vertex AI, puedes usar BigQuery como tu base de datos en la nube. En esta sección, se explica cómo leer y escribir en una base de datos de BigQuery desde tu Ray en el clúster de Vertex AI. En los pasos de esta sección, se supone que estás usando el SDK de Vertex AI para Python.

Si deseas leer desde un conjunto de datos de BigQuery, debes crear un conjunto de datos nuevo de BigQuery o usar un conjunto de datos existente.

Importa e inicializa el cliente de Ray en Vertex AI

Si ya estás conectado a tu Ray en Vertex AI clúster, reinicia tu kernel y ejecuta el siguiente código. La variable runtime_env es necesaria en el momento de la conexión para ejecutar comandos de BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.9.3"]
  }

ray.init(address=address, runtime_env=runtime_env)

Leer datos desde BigQuery

Lee datos de tu conjunto de datos de BigQuery. La lectura debe realizarse en una tarea de Ray.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Aquí:

  • PROJECT_ID: ID del proyecto de Google Cloud Puedes encontrar el ID del proyecto en la página de bienvenida de la consola de Google Cloud.

  • LOCATION: Es la ubicación en la que se almacena Dataset. Por ejemplo, us-central1.

  • DATASET: conjunto de datos de BigQuery Debe tener el formato dataset.table. Se establece en None si se proporciona una consulta.

  • PARALLELISM: un integer que influye en cuántas tareas de lectura se crean en paralelo. Es posible que haya menos transmisiones de lectura creadas de las que solicitaste.

  • QUERY: una string que contiene una consulta de SQL para leer desde la base de datos de BigQuery. Se establece en None si no se requiere ninguna consulta.

Transforma los datos

Actualiza y borra filas y columnas de tus tablas de BigQuery con pyarrow o pandas. Si deseas usar las transformaciones de pandas, se recomienda que mantengas el tipo de entrada como pyarrow y conviertas a pandas en la función definida por el usuario (UDF) para poder detectar cualquier error de tipo de conversión de pandas en el UDF. La transformación debe realizarse en una tarea de Ray.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Escribir datos a BigQuery

Inserta datos en tu conjunto de datos de BigQuery. La escritura debe realizarse en una tarea de Ray.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Aquí:

  • DATASET: conjunto de datos de BigQuery Debe tener el formato dataset.table.

¿Qué sigue?