如果您已连接到 Vertex AI 上的 Ray 集群,请重启内核并运行以下代码。连接时必须采用 runtime_env 变量才能运行 BigQuery 命令。
importrayfromgoogle.cloudimportaiplatform# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.address='vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)runtime_env={"pip":["google-cloud-aiplatform[ray]","ray==2.47.1"]}ray.init(address=address,runtime_env=runtime_env)
@ray.remotedefrun_remotely():# BigQuery Read firstimportpandasaspdimportpyarrowaspadeffilter_batch(table:pa.Table)->pa.Table:df=table.to_pandas(types_mapper={pa.int64():pd.Int64Dtype()}.get)# PANDAS_TRANSFORMATIONS_HEREreturnpa.Table.from_pandas(df)ds=ds.map_batches(filter_batch,batch_format="pyarrow").random_shuffle()ds.materialize()# You can repartition before writing to determine the number of write blocksds=ds.repartition(4)ds.materialize()
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["最后更新时间 (UTC):2025-08-19。"],[],[],null,["# Use Ray on Vertex AI with BigQuery\n\nWhen you run a Ray application on Vertex AI, use\n[BigQuery](/bigquery/docs/introduction) as your cloud database. This\nsection covers how to read from and write to a BigQuery database from\nyour on Vertex AI.\nThe steps in this section assume that you use\nthe Vertex AI SDK for Python.\n\nTo read from a BigQuery dataset, [create a new\nBigQuery dataset](/bigquery/docs/datasets) or use an existing dataset.\n\nImport and initialize Ray on Vertex AI client\n---------------------------------------------\n\nIf you're connected to your Ray cluster on Vertex AI, restart your\nkernel and run the following code. The `runtime_env` variable is necessary at\nconnection time to run BigQuery commands. \n\n```python\nimport ray\nfrom google.cloud import aiplatform\n\n# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.\naddress = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)\n\nruntime_env = {\n \"pip\":\n [\"google-cloud-aiplatform[ray]\",\"ray==2.47.1\"]\n }\n\nray.init(address=address, runtime_env=runtime_env)\n```\n\nRead data from BigQuery\n-----------------------\n\nRead data from your BigQuery dataset. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform\nthe read operation.\n**Note:** The maximum query response size is 10 GB. \n\n```python\naiplatform.init(project=PROJECT_ID, location=LOCATION)\n\n@ray.remote\ndef run_remotely():\n import vertex_ray\n dataset = DATASET\n parallelism = PARALLELISM\n query = QUERY\n\n ds = vertex_ray.data.read_bigquery(\n dataset=dataset,\n parallelism=parallelism,\n query=query\n )\n ds.materialize()\n```\n\nWhere:\n\n- **\u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e** : Google Cloud project ID. Find the project ID\n in the Google Cloud console [welcome](https://console.cloud.google.com/welcome)\n page.\n\n- **\u003cvar translate=\"no\"\u003eLOCATION\u003c/var\u003e** : The location where the `Dataset` is stored. For example,\n `us-central1`.\n\n- **\u003cvar translate=\"no\"\u003eDATASET\u003c/var\u003e** : BigQuery dataset. It must be in the format `dataset.table`.\n Set to `None` if you provide a query.\n\n- **\u003cvar translate=\"no\"\u003ePARALLELISM\u003c/var\u003e**: An integer that influences how many read tasks are\n created in parallel. There may be fewer read streams created than you\n requested.\n\n- **\u003cvar translate=\"no\"\u003eQUERY\u003c/var\u003e** : A string containing a SQL query to read from BigQuery database. Set to `None` if no query is required.\n\nTransform data\n--------------\n\nUpdate and delete rows and columns from your BigQuery tables using\n`pyarrow` or `pandas`. If you want to use `pandas` transformations,\nkeep the input type as pyarrow and convert to `pandas`\nwithin the user-defined function (UDF) so you can catch any `pandas` conversion\ntype errors within the UDF. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform the transformation. \n\n```python\n@ray.remote\ndef run_remotely():\n # BigQuery Read first\n import pandas as pd\n import pyarrow as pa\n\n def filter_batch(table: pa.Table) -\u003e pa.Table:\n df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)\n # PANDAS_TRANSFORMATIONS_HERE\n return pa.Table.from_pandas(df)\n\n ds = ds.map_batches(filter_batch, batch_format=\"pyarrow\").random_shuffle()\n ds.materialize()\n\n # You can repartition before writing to determine the number of write blocks\n ds = ds.repartition(4)\n ds.materialize()\n```\n\nWrite data to BigQuery\n----------------------\n\nInsert data to your BigQuery dataset. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform the write. \n\n```python\n@ray.remote\ndef run_remotely():\n # BigQuery Read and optional data transformation first\n dataset=DATASET\n vertex_ray.data.write_bigquery(\n ds,\n dataset=dataset\n )\n```\n\nWhere:\n\n- **\u003cvar translate=\"no\"\u003eDATASET\u003c/var\u003e** : BigQuery dataset. The dataset must be in the format `dataset.table`.\n\nWhat's next\n-----------\n\n- [Deploy a model on Vertex AI\n and get predictions](/vertex-ai/docs/open-source/ray-on-vertex-ai/deploy-predict)\n\n- [View logs for your Ray cluster on Vertex AI](/vertex-ai/docs/open-source/ray-on-vertex-ai/view-logs)\n\n- [Delete a Ray cluster](/vertex-ai/docs/open-source/ray-on-vertex-ai/delete-cluster)"]]