Stay organized with collections
Save and categorize content based on your preferences.
When you run a Ray application on Vertex AI, use
BigQuery as your cloud database. This
section covers how to read from and write to a BigQuery database from
your Ray cluster on Vertex AI.
The steps in this section assume that you use
the Vertex AI SDK for Python.
If you're connected to your Ray cluster on Vertex AI, restart your
kernel and run the following code. The runtime_env variable is necessary at
connection time to run BigQuery commands.
importrayfromgoogle.cloudimportaiplatform# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.address='vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)runtime_env={"pip":["google-cloud-aiplatform[ray]","ray==2.47.1"]}ray.init(address=address,runtime_env=runtime_env)
Read data from BigQuery
Read data from your BigQuery dataset. A
Ray Task must perform
the read operation.
PROJECT_ID: Google Cloud project ID. Find the project ID
in the Google Cloud console welcome
page.
LOCATION: The location where the Dataset is stored. For example,
us-central1.
DATASET: BigQuery dataset. It must be in the format dataset.table.
Set to None if you provide a query.
PARALLELISM: An integer that influences how many read tasks are
created in parallel. There may be fewer read streams created than you
requested.
QUERY: A string containing a SQL query to read from BigQuery database. Set to None if no query is required.
Transform data
Update and delete rows and columns from your BigQuery tables using
pyarrow or pandas. If you want to use pandas transformations,
keep the input type as pyarrow and convert to pandas
within the user-defined function (UDF) so you can catch any pandas conversion
type errors within the UDF. A
Ray Task must perform the transformation.
@ray.remotedefrun_remotely():# BigQuery Read firstimportpandasaspdimportpyarrowaspadeffilter_batch(table:pa.Table)->pa.Table:df=table.to_pandas(types_mapper={pa.int64():pd.Int64Dtype()}.get)# PANDAS_TRANSFORMATIONS_HEREreturnpa.Table.from_pandas(df)ds=ds.map_batches(filter_batch,batch_format="pyarrow").random_shuffle()ds.materialize()# You can repartition before writing to determine the number of write blocksds=ds.repartition(4)ds.materialize()
Write data to BigQuery
Insert data to your BigQuery dataset. A
Ray Task must perform the write.
@ray.remotedefrun_remotely():# BigQuery Read and optional data transformation firstdataset=DATASETvertex_ray.data.write_bigquery(ds,dataset=dataset)
Where:
DATASET: BigQuery dataset. The dataset must be in the format dataset.table.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2025-08-25 UTC."],[],[],null,["# Use Ray on Vertex AI with BigQuery\n\nWhen you run a Ray application on Vertex AI, use\n[BigQuery](/bigquery/docs/introduction) as your cloud database. This\nsection covers how to read from and write to a BigQuery database from\nyour on Vertex AI.\nThe steps in this section assume that you use\nthe Vertex AI SDK for Python.\n\nTo read from a BigQuery dataset, [create a new\nBigQuery dataset](/bigquery/docs/datasets) or use an existing dataset.\n\nImport and initialize Ray on Vertex AI client\n---------------------------------------------\n\nIf you're connected to your Ray cluster on Vertex AI, restart your\nkernel and run the following code. The `runtime_env` variable is necessary at\nconnection time to run BigQuery commands. \n\n```python\nimport ray\nfrom google.cloud import aiplatform\n\n# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.\naddress = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)\n\nruntime_env = {\n \"pip\":\n [\"google-cloud-aiplatform[ray]\",\"ray==2.47.1\"]\n }\n\nray.init(address=address, runtime_env=runtime_env)\n```\n\nRead data from BigQuery\n-----------------------\n\nRead data from your BigQuery dataset. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform\nthe read operation.\n**Note:** The maximum query response size is 10 GB. \n\n```python\naiplatform.init(project=PROJECT_ID, location=LOCATION)\n\n@ray.remote\ndef run_remotely():\n import vertex_ray\n dataset = DATASET\n parallelism = PARALLELISM\n query = QUERY\n\n ds = vertex_ray.data.read_bigquery(\n dataset=dataset,\n parallelism=parallelism,\n query=query\n )\n ds.materialize()\n```\n\nWhere:\n\n- **\u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e** : Google Cloud project ID. Find the project ID\n in the Google Cloud console [welcome](https://console.cloud.google.com/welcome)\n page.\n\n- **\u003cvar translate=\"no\"\u003eLOCATION\u003c/var\u003e** : The location where the `Dataset` is stored. For example,\n `us-central1`.\n\n- **\u003cvar translate=\"no\"\u003eDATASET\u003c/var\u003e** : BigQuery dataset. It must be in the format `dataset.table`.\n Set to `None` if you provide a query.\n\n- **\u003cvar translate=\"no\"\u003ePARALLELISM\u003c/var\u003e**: An integer that influences how many read tasks are\n created in parallel. There may be fewer read streams created than you\n requested.\n\n- **\u003cvar translate=\"no\"\u003eQUERY\u003c/var\u003e** : A string containing a SQL query to read from BigQuery database. Set to `None` if no query is required.\n\nTransform data\n--------------\n\nUpdate and delete rows and columns from your BigQuery tables using\n`pyarrow` or `pandas`. If you want to use `pandas` transformations,\nkeep the input type as pyarrow and convert to `pandas`\nwithin the user-defined function (UDF) so you can catch any `pandas` conversion\ntype errors within the UDF. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform the transformation. \n\n```python\n@ray.remote\ndef run_remotely():\n # BigQuery Read first\n import pandas as pd\n import pyarrow as pa\n\n def filter_batch(table: pa.Table) -\u003e pa.Table:\n df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)\n # PANDAS_TRANSFORMATIONS_HERE\n return pa.Table.from_pandas(df)\n\n ds = ds.map_batches(filter_batch, batch_format=\"pyarrow\").random_shuffle()\n ds.materialize()\n\n # You can repartition before writing to determine the number of write blocks\n ds = ds.repartition(4)\n ds.materialize()\n```\n\nWrite data to BigQuery\n----------------------\n\nInsert data to your BigQuery dataset. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform the write. \n\n```python\n@ray.remote\ndef run_remotely():\n # BigQuery Read and optional data transformation first\n dataset=DATASET\n vertex_ray.data.write_bigquery(\n ds,\n dataset=dataset\n )\n```\n\nWhere:\n\n- **\u003cvar translate=\"no\"\u003eDATASET\u003c/var\u003e** : BigQuery dataset. The dataset must be in the format `dataset.table`.\n\nWhat's next\n-----------\n\n- [Deploy a model on Vertex AI\n and get predictions](/vertex-ai/docs/open-source/ray-on-vertex-ai/deploy-predict)\n\n- [View logs for your Ray cluster on Vertex AI](/vertex-ai/docs/open-source/ray-on-vertex-ai/view-logs)\n\n- [Delete a Ray cluster](/vertex-ai/docs/open-source/ray-on-vertex-ai/delete-cluster)"]]