使用遠端函式
您可以使用 BigQuery 遠端函式,以 SQL 和 JavaScript 以外的語言,或 BigQuery 使用者定義函式不允許的程式庫或服務,實作函式。
總覽
透過 BigQuery 遠端函式,您可以直接整合 Cloud Run 函式和 Cloud Run,將 GoogleSQL 功能與 BigQuery 以外的軟體整合。透過 BigQuery 遠端函式,您可以在 Cloud Run 函式或使用任何支援語言實作的 Cloud Run 中部署函式,然後從 GoogleSQL 查詢中叫用這些函式。
工作流程
- 在 Cloud Run 函式或 Cloud Run 中建立 HTTP 端點。
- 在 BigQuery 中建立遠端函式。
- 建立
CLOUD_RESOURCE
類型的連線。 - 建立遠端函式。
- 建立
- 在查詢中使用遠端函式,就像使用其他使用者定義函式一樣。
限制
遠端函式僅支援下列任一資料類型做為引數類型或傳回類型:
- 布林值
- 位元組
- 數字
- 字串
- 日期
- 日期時間
- 時間
- 時間戳記
- JSON
遠端函式不支援
ARRAY
、STRUCT
、INTERVAL
或GEOGRAPHY
類型。您無法建立資料表值遠端函式。
建立具體化檢視表時,您無法使用遠端函式。
系統一律會假設遠端函式的傳回值具有不確定性,因此不會快取呼叫遠端函式的查詢結果。
即使收到成功回應,您可能還是會看到重複的請求,其中包含相同的資料,這是因為發生暫時性網路錯誤或 BigQuery 內部錯誤。
如果某些資料列因短路而略過遠端函式評估作業 (例如,在條件式運算式或含有
WHEN [NOT] MATCHED
的MERGE
陳述式中),則不會將批次處理與遠端函式搭配使用。在這種情況下,HTTP 要求主體中的calls
欄位只有一個元素。如果與遠端函式相關聯的資料集是透過跨區域資料集複製複製到目的地區域,則只能在建立遠端函式的區域中查詢該函式。
建立端點
如要建立可實作商業邏輯的遠端函式,您必須使用 Cloud Run 函式或 Cloud Run 建立 HTTP 端點。端點必須能夠在單一 HTTP POST 要求中處理一批資料列,並以 HTTP 回應的形式傳回這批資料列的結果。
如果您是使用 BigQuery DataFrames 建立遠端函式,就不需要手動建立 HTTP 端點,服務會自動為您完成這項操作。
請參閱 Cloud Run 函式教學課程和其他 Cloud Run 函式說明文件,瞭解如何編寫、部署、測試及維護 Cloud Run 函式。
請參閱 Cloud Run 快速入門和其他 Cloud Run 說明文件,瞭解如何編寫、部署、測試及維護 Cloud Run 服務。
建議您保留預設驗證機制,不要允許未經驗證的 Cloud Run 函式或 Cloud Run 服務叫用。
輸入格式
BigQuery 會以以下格式傳送含有 JSON 主體的 HTTP POST 要求:
欄位名稱 | 說明 | 欄位類型 |
---|---|---|
requestId | 要求的 ID。在 GoogleSQL 查詢中,傳送至此端點的多個要求中不重複的值。 | 一律提供。字串。 |
呼叫者 | 呼叫遠端函式的 GoogleSQL 查詢工作完整資源名稱。 | 一律提供。字串。 |
sessionUser | 執行 GoogleSQL 查詢的使用者電子郵件。 | 一律提供。字串。 |
userDefinedContext | 在 BigQuery 中建立遠端函式時使用的使用者定義情境。 | (選用步驟) 含有鍵/值組合的 JSON 物件。 |
通話 | 一批輸入資料。 | 一律提供。JSON 陣列。 每個元素本身都是 JSON 陣列,也就是一個遠端函式呼叫的 JSON 編碼引數清單。 |
請參考以下要求範例:
{
"requestId": "124ab1c",
"caller": "//bigquery.googleapis.com/projects/myproject/jobs/myproject:US.bquxjob_5b4c112c_17961fafeaf",
"sessionUser": "test-user@test-company.com",
"userDefinedContext": {
"key1": "value1",
"key2": "v2"
},
"calls": [
[null, 1, "", "abc"],
["abc", "9007199254740993", null, null]
]
}
輸出格式
BigQuery 預期端點應以以下格式傳回 HTTP 回應,否則 BigQuery 無法使用該回應,且會導致呼叫遠端函式的查詢失敗。
欄位名稱 | 說明 | 值範圍 |
回覆 | 一批回傳值。 | 這是成功回應的必要條件。JSON 陣列。 每個元素都對應至外部函式的 JSON 編碼回傳值。
陣列大小必須與 HTTP 要求中 |
errorMessage | 傳回 200 以外的 HTTP 回應碼時的錯誤訊息。對於無法重試的錯誤,我們會將此錯誤訊息傳回給使用者,做為 BigQuery 作業的錯誤訊息。 | (選用步驟) 字串。大小不得超過 1 KB。 |
成功回應範例:
{
"replies": [
1,
0
]
}
失敗回應範例:
{
"errorMessage": "Received but not expected that the argument 0 be null".
}
HTTP 回應碼
端點應傳回 HTTP 回應代碼 200,表示成功回應。如果 BigQuery 收到任何其他值,就會將回應視為失敗,並在 HTTP 回應碼為 408、429、500、503 或 504 時重試,直到達到某些內部限制為止。
SQL 資料類型的 JSON 編碼
HTTP 要求/回應中的 JSON 編碼會遵循 TO_JSON_STRING 函式的現有 BigQuery JSON 編碼。
Cloud Run 函式程式碼範例
以下 Python 程式碼範例會實作新增遠端函式的所有整數引數。它會處理含有批次叫用引數的要求,並在回應中傳回所有結果。
import functions_framework
from flask import jsonify
# Max INT64 value encoded as a number in JSON by TO_JSON_STRING. Larger values are encoded as
# strings.
# See https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_encodings
_MAX_LOSSLESS=9007199254740992
@functions_framework.http
def batch_add(request):
try:
return_value = []
request_json = request.get_json()
calls = request_json['calls']
for call in calls:
return_value.append(sum([int(x) if isinstance(x, str) else x for x in call if x is not None]))
replies = [str(x) if x > _MAX_LOSSLESS or x < -_MAX_LOSSLESS else x for x in return_value]
return_json = jsonify( { "replies": replies } )
return return_json
except Exception as e:
return jsonify( { "errorMessage": str(e) } ), 400
假設函式已部署至 us-east1
區域的 my_gcf_project
專案,並以函式名稱 remote_add
命名,則可透過端點 https://us-east1-my_gcf_project.cloudfunctions.net/remote_add
存取該函式。
Cloud Run 程式碼範例
下列 Python 程式碼範例會實作網頁服務,您可以建構並將其部署至 Cloud Run 以便使用相同功能。
import os
from flask import Flask, request, jsonify
# Max INT64 value encoded as a number in JSON by TO_JSON_STRING. Larger values are encoded as
# strings.
# See https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_encodings
_MAX_LOSSLESS=9007199254740992
app = Flask(__name__)
@app.route("/", methods=['POST'])
def batch_add():
try:
return_value = []
request_json = request.get_json()
calls = request_json['calls']
for call in calls:
return_value.append(sum([int(x) if isinstance(x, str) else x for x in call if x is not None]))
replies = [str(x) if x > _MAX_LOSSLESS or x < -_MAX_LOSSLESS else x for x in return_value]
return jsonify( { "replies" : replies } )
except Exception as e:
return jsonify( { "errorMessage": str(e) } ), 400
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
請參閱指南,瞭解如何建構及部署程式碼。
假設 Cloud Run 服務已部署至 us-east1
區域的 my_gcf_project
專案,並以 remote_add
做為服務名稱,則可透過 https://remote_add-<project_id_hash>-ue.a.run.app
端點存取該服務。
建立遠端函式
BigQuery 會使用 CLOUD_RESOURCE
連線與 Cloud Run 函式互動。如要建立遠端函式,您必須建立 CLOUD_RESOURCE
連線。如果您是使用 BigQuery DataFrames 建立遠端函式,且已獲得「Project IAM Admin (roles/resourcemanager.projectIamAdmin
)」角色,則不必手動建立連線並授予存取權,服務會自動為您完成這項操作。
建立連線
您必須擁有 Cloud 資源連線,才能連線至 Cloud Run 函式和 Cloud Run。
如果您已設定具備適當權限的預設連線,可以略過這個步驟。
為遠端模型建立Cloud 資源連線,並取得連線的服務帳戶。請在與您在上一個步驟中建立的資料集相同的位置建立連線。
選取下列選項之一:
主控台
前往「BigQuery」頁面
在「Explorer」窗格中,按一下
「新增資料」:「Add data」對話方塊隨即開啟。
在「Filter By」窗格中的「Data Source Type」部分,選取「Business Applications」。
或者,您也可以在「Search for data sources」欄位中輸入
Vertex AI
。在「精選資料來源」部分,按一下「Vertex AI」。
按一下「Vertex AI 模型:BigQuery 聯盟」解決方案資訊卡。
在「連線類型」清單中,選取「Vertex AI 遠端模型、遠端函式和 BigLake (Cloud 資源)」。
在「連線 ID」欄位中,輸入連線的名稱。
點選「建立連線」。
按一下「前往連線」。
在「連線資訊」窗格中,複製服務帳戶 ID,以便在後續步驟中使用。
bq
在指令列環境中建立連線:
bq mk --connection --location=REGION --project_id=PROJECT_ID \ --connection_type=CLOUD_RESOURCE CONNECTION_ID
--project_id
參數會覆寫預設專案。更改下列內容:
REGION
:您的連線區域PROJECT_ID
:您的 Google Cloud 專案 IDCONNECTION_ID
:連線 ID
建立連線資源時,BigQuery 會建立專屬的系統服務帳戶,並將該帳戶與連線建立關聯。
疑難排解:如果您收到下列連線錯誤,請更新 Google Cloud SDK:
Flags parsing error: flag --connection_type=CLOUD_RESOURCE: value should be one of...
擷取並複製服務帳戶 ID,以便在後續步驟中使用:
bq show --connection PROJECT_ID.REGION.CONNECTION_ID
輸出結果會與下列內容相似:
name properties 1234.REGION.CONNECTION_ID {"serviceAccountId": "connection-1234-9u56h9@gcp-sa-bigquery-condel.iam.gserviceaccount.com"}
Terraform
使用 google_bigquery_connection
資源。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。
以下範例會在 US
地區中建立名為 my_cloud_resource_connection
的 Cloud 資源連線:
如要在 Google Cloud 專案中套用 Terraform 設定,請完成下列各節中的步驟。
準備 Cloud Shell
- 啟動 Cloud Shell。
-
設定要套用 Terraform 設定的預設 Google Cloud 專案。
您只需為每個專案執行這個指令一次,而且可以在任何目錄中執行。
export GOOGLE_CLOUD_PROJECT=PROJECT_ID
如果您在 Terraform 設定檔中設定明確的值,系統就會覆寫環境變數。
準備目錄
每個 Terraform 設定檔都必須有自己的目錄 (也稱為根模組)。
-
在 Cloud Shell 中建立目錄,並在該目錄中建立新檔案。檔案名稱必須包含
.tf
副檔名,例如main.tf
。在本教學課程中,檔案稱為main.tf
。mkdir DIRECTORY && cd DIRECTORY && touch main.tf
-
如果您正在參考教學課程,可以複製各個章節或步驟中的程式碼範例。
將範例程式碼複製到新建立的
main.tf
中。您可以視需要從 GitHub 複製程式碼。如果 Terraform 程式碼片段是端對端解決方案的一部分,建議您採用這種做法。
- 查看並修改要套用至環境的範例參數。
- 儲存變更。
-
初始化 Terraform。這項操作只需對每個目錄執行一次。
terraform init
如要使用最新版的 Google 供應器,您可以選擇加入
-upgrade
選項:terraform init -upgrade
套用變更
-
檢查設定,確認 Terraform 要建立或更新的資源符合您的預期:
terraform plan
視需要修正設定。
-
執行下列指令,並在提示中輸入
yes
,即可套用 Terraform 設定:terraform apply
等待 Terraform 顯示「Apply complete!」(套用完成) 訊息。
- 開啟 Google Cloud 專案即可查看結果。在 Google Cloud 控制台中,前往 UI 中的資源,確認 Terraform 已建立或更新這些資源。
設定存取權
您必須將 Cloud Run 函式或 Cloud Run 服務的唯讀權限授予新連線。不建議為 Cloud Run 函式或 Cloud Run 服務允許未經驗證的叫用要求。
如要授予角色,請按照下列步驟操作:
前往「IAM & Admin」(IAM 與管理) 頁面。
按一下
「新增」。系統會開啟「Add principals」對話方塊。
在「新增主體」欄位,輸入先前複製的服務帳戶 ID。
在「請選取角色」欄位中,選取下列其中一個選項:
- 如果您使用的是第 1 代 Cloud Run 函式,請選擇「Cloud Function」,然後選取「Cloud Function Invoker role」。
- 如果您使用的是第 2 代 Cloud Run 函式,請選擇「Cloud Run」,然後選取「Cloud Run Invoker 角色」。
- 如果您使用的是 Cloud Run 服務,請選擇「Cloud Run」,然後選取「Cloud Run Invoker 角色」。
按一下 [儲存]。
建立遠端函式
如要建立遠端函式,請按照下列步驟操作:
SQL
在 BigQuery 中執行下列 CREATE FUNCTION
陳述式:
前往 Google Cloud 控制台的「BigQuery」頁面。
在查詢編輯器中輸入以下陳述式:
CREATE FUNCTION
PROJECT_ID.DATASET_ID
.remote_add(x INT64, y INT64) RETURNS INT64 REMOTE WITH CONNECTIONPROJECT_ID.LOCATION.CONNECTION_NAME
OPTIONS ( endpoint = 'ENDPOINT_URL' )請依指示取代下列項目:
DATASET_ID
:BigQuery 資料集的 ID。ENDPOINT_URL
:Cloud Run 函式或 Cloud Run 遠端函式端點的網址。
按一下
「Run」。
如要進一步瞭解如何執行查詢,請參閱「執行互動式查詢」一文。
BigQuery DataFrames
- 請啟用必要的 API,並確認您已獲得必要的角色,詳情請參閱「遠端函式」的「必要條件」一節。
-
import bigframes.pandas as bpd # Set BigQuery DataFrames options bpd.options.bigquery.project = your_gcp_project_id bpd.options.bigquery.location = "US" # BigQuery DataFrames gives you the ability to turn your custom scalar # functions into a BigQuery remote function. It requires the GCP project to # be set up appropriately and the user having sufficient privileges to use # them. One can find more details about the usage and the requirements via # `help` command. help(bpd.remote_function) # Read a table and inspect the column of interest. df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins") df["body_mass_g"].head(10) # Define a custom function, and specify the intent to turn it into a remote # function. It requires a BigQuery connection. If the connection is not # already created, BigQuery DataFrames will attempt to create one assuming # the necessary APIs and IAM permissions are setup in the project. In our # examples we will be letting the default connection `bigframes-default-connection` # be used. We will also set `reuse=False` to make sure we don't # step over someone else creating remote function in the same project from # the exact same source code at the same time. Let's try a `pandas`-like use # case in which we want to apply a user defined scalar function to every # value in a `Series`, more specifically bucketize the `body_mass_g` value # of the penguins, which is a real number, into a category, which is a # string. @bpd.remote_function( reuse=False, cloud_function_service_account="default", ) def get_bucket(num: float) -> str: if not num: return "NA" boundary = 4000 return "at_or_above_4000" if num >= boundary else "below_4000" # Then we can apply the remote function on the `Series` of interest via # `apply` API and store the result in a new column in the DataFrame. df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket)) # This will add a new column `body_mass_bucket` in the DataFrame. You can # preview the original value and the bucketized value side by side. df[["body_mass_g", "body_mass_bucket"]].head(10) # The above operation was possible by doing all the computation on the # cloud. For that, there is a google cloud function deployed by serializing # the user code, and a BigQuery remote function created to call the cloud # function via the latter's http endpoint on the data in the DataFrame. # The BigQuery remote function created to support the BigQuery DataFrames # remote function can be located via a property `bigframes_remote_function` # set in the remote function object. print(f"Created BQ remote function: {get_bucket.bigframes_remote_function}") # The cloud function can be located via another property # `bigframes_cloud_function` set in the remote function object. print(f"Created cloud function: {get_bucket.bigframes_cloud_function}") # Warning: The deployed cloud function may be visible to other users with # sufficient privilege in the project, so the user should be careful about # having any sensitive data in the code that will be deployed as a remote # function. # Let's continue trying other potential use cases of remote functions. Let's # say we consider the `species`, `island` and `sex` of the penguins # sensitive information and want to redact that by replacing with their hash # code instead. Let's define another scalar custom function and decorate it # as a remote function. The custom function in this example has external # package dependency, which can be specified via `packages` parameter. @bpd.remote_function( reuse=False, packages=["cryptography"], cloud_function_service_account="default", ) def get_hash(input: str) -> str: from cryptography.fernet import Fernet # handle missing value if input is None: input = "" key = Fernet.generate_key() f = Fernet(key) return f.encrypt(input.encode()).decode() # We can use this remote function in another `pandas`-like API `map` that # can be applied on a DataFrame df_redacted = df[["species", "island", "sex"]].map(get_hash) df_redacted.head(10)
您必須在建立遠端函式的資料集中具備 bigquery.routines.create
權限,並在遠端函式使用的連線中具備 bigquery.connections.delegate
權限 (可透過 BigQuery 連線管理員角色取得)。
提供使用者定義的背景資訊
您可以在 OPTIONS
中指定 user_defined_context
做為鍵/值組合的形式,這會是傳送至端點的每個 HTTP 要求的一部分。使用者定義的內容可讓您建立多個遠端函式,但重複使用單一端點,根據傳遞至該端點的內容提供不同的行為。
以下範例會建立兩個遠端函式,以便使用相同的端點加密及解密 BYTES
資料。
CREATE FUNCTION `PROJECT_ID.DATASET_ID`.encrypt(x BYTES)
RETURNS BYTES
REMOTE WITH CONNECTION `PROJECT_ID.LOCATION.CONNECTION_NAME`
OPTIONS (
endpoint = 'ENDPOINT_URL',
user_defined_context = [("mode", "encryption")]
)
CREATE FUNCTION `PROJECT_ID.DATASET_ID`.decrypt(x BYTES)
RETURNS BYTES
REMOTE WITH CONNECTION `PROJECT_ID.LOCATION.CONNECTION_NAME`
OPTIONS (
endpoint = 'ENDPOINT_URL',
user_defined_context = [("mode", "decryption")]
)
限制批次要求中的列數
您可以在 OPTIONS
中指定 max_batching_rows
,做為每個 HTTP 要求的最大列數,以免 Cloud Run 函式逾時。如果您指定 max_batching_rows
,BigQuery 會判斷批次中的資料列數量,上限為 max_batching_rows
。如未指定,BigQuery 會自動決定要批次處理的資料列數量。
在查詢中使用遠端函式
請確認您已授予 Cloud Run 函式的權限,以便 BigQuery 的服務帳戶存取與遠端函式連結相關聯的資料。
此外,您還必須在遠端函式所在的資料集上擁有 bigquery.routines.get
權限,並在遠端函式使用的連線上擁有 bigquery.connections.use
權限 (可透過 BigQuery Connection User
角色取得)。
您可以在查詢中使用遠端函式,就像使用使用者定義的函式一樣。
例如,您可以在範例查詢中使用 remote_add
函式:
SELECT
val,
`PROJECT_ID.DATASET_ID`.remote_add(val, 2)
FROM
UNNEST([NULL,2,3,5,8]) AS val;
此範例會產生以下輸出內容:
+------+-----+
| val | f0_ |
+------+-----+
| NULL | 2 |
| 2 | 4 |
| 3 | 5 |
| 5 | 7 |
| 8 | 10 |
+------+-----+
支援的地區
BigQuery 中的地點有兩種類型:
「地區」是特定的地理位置,例如倫敦。
「多地區」是指包含兩個及以上地理位置的大型地理區域,例如美國。
單一地區
在 BigQuery 單一地區資料集中,您只能建立使用在相同地區部署的 Cloud Run 函式做為遠端函式。例如:
- BigQuery 單一區域
us-east4
中的遠端函式只能使用us-east4
中的 Cloud Run 函式。
因此,如果是單一區域,只有同時支援 Cloud Run 函式和 BigQuery 的區域才能支援遠端函式。
多區域
在 BigQuery 多區域 (US
、EU
) 資料集中,您只能建立使用 Cloud Run 函式 (已部署在同一大地理區域 (US、EU) 中的某個區域) 的遠端函式。例如:
- BigQuery
US
多區域中的遠端函式只能使用在美國地理區域內任何單一區域中部署的 Cloud Run 函式,例如us-central1
、us-east4
、us-west2
等。 - BigQuery
EU
多區域中的遠端函式只能使用在歐洲聯盟成員國的任何單一區域中部署的 Cloud Run 函式,例如europe-north1
、europe-west3
等。
如要進一步瞭解 BigQuery 單一地區和多地區,請參閱「資料集位置」頁面。如要進一步瞭解 Cloud Run 函式區域,請參閱「Cloud Run 函式位置」頁面。
連線
無論是單一地區或多地區位置,您只能在與所用連線相同的位置建立遠端函式。舉例來說,如要在 US
多區域中建立遠端函式,請使用位於 US
多區域中的連線。
定價
適用標準 BigQuery 定價。
此外,使用這項功能可能會產生 Cloud Run 函式和 Cloud Run 費用。詳情請參閱 Cloud Run 函式和 Cloud Run 定價頁面。
使用 VPC Service Controls
VPC Service Controls 是 Google Cloud 可讓您設定安全範圍,以防範資料外洩的功能。如要使用 VPC Service Controls 搭配遠端函式來強化安全性,或是使用端點搭配 internal traffic
輸入設定,請按照 VPC Service Controls 指南的步驟操作:
建立服務範圍。
將使用遠端函式的查詢 BigQuery 專案新增至範圍。
將端點專案新增至範圍,並根據端點類型在受限制的服務中設定
Cloud Functions API
或Cloud Run API
。詳情請參閱「Cloud Run 函式 VPC Service Controls」和「Cloud Run VPC Service Controls」。
遠端函式的最佳做法
預先篩選輸入內容:如果輸入內容在傳遞到遠端函式之前,能夠輕易地進行篩選,您查詢的執行速度可能會更快,費用也可能會更便宜。
確保 Cloud Run 函式可擴充。可擴充性取決於最小執行個體數量、最大執行個體數量和並行性。
- 盡可能使用 Cloud Run 函式的執行個體上限預設值。
- 請注意,第 1 代 HTTP Cloud Run 函式沒有預設限制。為了避免在測試或正式版環境中,第 1 代 HTTP Cloud Run 函式發生無限擴充事件,建議您設定限制,例如 3000。
請參考其他Cloud Run 函式提示,提升效能。與延遲時間較長的 Cloud Run 函式互動的遠端函式查詢可能會因逾時而失敗。
實作端點,以便針對失敗的回應傳回正確的 HTTP 回應代碼和酬載。
為減少 BigQuery 的重試次數,請使用 408、429、500、503 和 504 以外的 HTTP 回應碼來處理失敗的回應,並確保在函式程式碼中擷取所有例外狀況。否則,HTTP 服務架構可能會自動傳回 500 以處理任何未偵測到的例外狀況。當 BigQuery 重試失敗的資料分區或查詢時,您可能仍會看到重試的 HTTP 要求。
端點應以定義的格式傳回 JSON 酬載,以便處理失敗的回應。雖然這並非必要,但有助於 BigQuery 區分失敗回應是否來自函式實作或 Cloud Run 函式/Cloud Run 的基礎架構。針對後者,BigQuery 可能會以不同的內部限制重試。
配額
如要瞭解遠端函式配額,請參閱「配額與限制」。