Menggunakan Ray di Vertex AI dengan BigQuery

Saat menjalankan aplikasi Ray di Vertex AI, gunakan BigQuery sebagai database cloud Anda. Bagian ini membahas cara membaca dan menulis ke database BigQuery dari cluster Ray Anda di Vertex AI. Langkah-langkah di bagian ini mengasumsikan bahwa Anda menggunakan Vertex AI SDK untuk Python.

Untuk membaca dari set data BigQuery, buat set data BigQuery baru atau gunakan set data yang ada.

Mengimpor dan melakukan inisialisasi klien Ray di Vertex AI

Jika Anda terhubung ke cluster Ray di Vertex AI, mulai ulang kernel dan jalankan kode berikut. Variabel runtime_env diperlukan pada waktu koneksi untuk menjalankan perintah BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.47.1"]
  }

ray.init(address=address, runtime_env=runtime_env)

Membaca data dari BigQuery

Membaca data dari set data BigQuery Anda. Tugas Ray harus melakukan operasi baca.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Dengan:

  • PROJECT_ID: Google Cloud project ID. Temukan project ID di halaman selamat datang konsol Google Cloud .

  • LOCATION: Lokasi tempat Dataset disimpan. Contoh, us-central1.

  • DATASET: Set data BigQuery. Harus dalam format dataset.table. Tetapkan ke None jika Anda memberikan kueri.

  • PARALLELISM: Bilangan bulat yang memengaruhi jumlah tugas baca yang dibuat secara paralel. Mungkin ada lebih sedikit aliran data baca yang dibuat daripada yang Anda minta.

  • QUERY: String yang berisi kueri SQL untuk dibaca dari database BigQuery. Tetapkan ke None jika tidak ada kueri yang diperlukan.

Mentransformasi data

Perbarui dan hapus baris dan kolom dari tabel BigQuery menggunakan pyarrow atau pandas. Jika Anda ingin menggunakan transformasi pandas, pertahankan jenis input sebagai pyarrow dan konversi ke pandas dalam fungsi yang ditentukan pengguna (UDF) sehingga Anda dapat menangkap error jenis konversi pandas dalam UDF. Ray Task harus melakukan transformasi.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Menulis data ke BigQuery

Menyisipkan data ke set data BigQuery. Ray Task harus melakukan penulisan.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Dengan:

  • DATASET: Set data BigQuery. Set data harus dalam format dataset.table.

Langkah berikutnya