搭配使用 BigQuery 和 Ray on Vertex AI

在 Vertex AI 上執行 Ray 應用程式時,您可以使用 BigQuery 做為雲端資料庫。本節將說明如何從 Vertex AI 的 Ray 叢集讀取及寫入 BigQuery 資料庫。本節的步驟假設您使用的是 Python 適用的 Vertex AI SDK。

如果您想從 BigQuery 資料集讀取資料,請建立新的 BigQuery 資料集或使用現有資料集。

匯入並初始化 Ray on Vertex AI 用戶端

如果您已連線至 Vertex AI 上的 Ray 叢集,請重新啟動核心並執行下列程式碼。如要執行 BigQuery 指令,必須在連線時使用 runtime_env 變數。

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.42.0"]
  }

ray.init(address=address, runtime_env=runtime_env)

讀取 BigQuery 資料

讀取 BigQuery 資料集的資料。讀取作業必須在 Ray 工作中執行。

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

其中:

  • PROJECT_ID: Google Cloud 專案 ID。您可以在 Google Cloud 控制台歡迎頁面中找到專案 ID。

  • LOCATION:儲存 Dataset 的位置。例如:us-central1

  • DATASET:BigQuery 資料集。格式必須為 dataset.table。如果提供查詢,請設為 None

  • PARALLELISM:整數,可影響同時建立的讀取工作數量。建立的讀取串流可能會比您要求的少。

  • QUERY:包含 SQL 查詢的字串,用於從 BigQuery 資料庫讀取資料。如果不需要查詢,請設為 None

轉換資料

使用 pyarrowpandas 更新及刪除 BigQuery 資料表中的資料列和資料欄。如果您想使用 pandas 轉換,建議您將輸入類型設為 pyarrow,並在使用者定義函式 (UDF) 中轉換為 pandas,這樣您就能在 UDF 中擷取任何 pandas 轉換類型錯誤。轉換作業必須在 Ray 工作中執行。

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

將資料寫入 BigQuery

將資料插入 BigQuery 資料集。寫入作業必須在 Ray 工作中執行。

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

其中:

  • DATASET:BigQuery 資料集。格式必須為 dataset.table

後續步驟