透過 Confluent Cloud 匯入主題,您可以持續從 Confluent Cloud 擷取資料,並將資料做為外部來源匯入 Pub/Sub。接著,您可以將資料串流至 Pub/Sub 支援的任何目的地。
本文件說明如何建立及管理 Confluent Cloud 匯入主題。如要建立標準主題,請參閱「建立標準主題」一文。
如要進一步瞭解匯入主題,請參閱「關於匯入主題」。
事前準備
進一步瞭解 Pub/Sub 發布程序。
設定工作負載身分聯盟,讓Google Cloud 能夠存取外部串流服務。
必要角色和權限
如要取得建立及管理 Confluent Cloud 匯入主題所需的權限,請要求管理員為您的主題或專案授予 Pub/Sub 編輯者 (roles/pubsub.editor
) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
這個預先定義的角色包含建立及管理 Confluent Cloud 匯入主題所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:
所需權限
如要建立及管理 Confluent Cloud 匯入主題,您必須具備下列權限:
-
建立匯入主題:
pubsub.topics.create
-
刪除匯入主題:
pubsub.topics.delete
-
取得匯入主題:
pubsub.topics.get
-
列出匯入主題:
pubsub.topics.list
-
發布至匯入主題:
pubsub.topics.publish
-
更新匯入主題:
pubsub.topics.update
-
取得匯入主題的身分與存取權管理政策:
pubsub.topics.getIamPolicy
-
為匯入主題設定 IAM 政策:
pubsub.topics.setIamPolicy
您可以在專案層級和個別資源層級設定存取權控管。
設定聯合身分,以便存取 Confluent Cloud
Workload Identity 聯盟可讓 Google Cloud 服務存取在 Google Cloud以外執行的工作負載。有了身分聯盟,您就不必維護或傳遞憑證,就能 Google Cloud 存取其他雲端中的資源。您可以改用工作負載本身的識別資訊,驗證 Google Cloud 並存取資源。
在 Google Cloud中建立服務帳戶
這是選用步驟。如果您已有服務帳戶,可以在這項程序中使用該帳戶,而不需要建立新的服務帳戶。如果您使用現有的服務帳戶,請參閱記錄服務帳戶的唯一 ID,瞭解下一個步驟。
針對 Confluent Cloud 匯入主題,Pub/Sub 會使用服務帳戶做為身分,以便存取 Confluent Cloud 中的資源。
如要進一步瞭解如何建立服務帳戶,包括必要條件、必要角色和權限,以及命名規範,請參閱「建立服務帳戶」一文。建立服務帳戶後,您可能需要等待 60 秒或更長的時間,才能使用服務帳戶。這種行為的發生,是因為讀取作業最終會一致;新服務帳戶可能需要一段時間才會顯示。
記下服務帳戶專屬 ID
您需要服務帳戶專屬 ID,才能在 Confluent Cloud 控制台中設定身分提供者和資源池。
前往 Google Cloud 控制台的「Service account」詳細資料頁面。
按一下您剛建立的服務帳戶,或您打算使用的服務帳戶。
在「服務帳戶詳細資料」頁面中記下專屬 ID 編號。
您需要這組 ID 才能在工作流程中在 Confluent Cloud 控制台中設定身分識別資訊提供者和集區。
為 Pub/Sub 服務帳戶新增服務帳戶權杖建立者角色
服務帳戶權杖建立者角色 (roles/iam.serviceAccountTokenCreator
) 可讓實體為服務帳戶建立短期憑證。這些權杖或憑證用於模擬服務帳戶。
如要進一步瞭解服務帳戶模擬功能,請參閱「服務帳戶模擬功能」。
您也可以在這個程序中新增 Pub/Sub 發布者角色 (roles/pubsub.publisher
)。如要進一步瞭解這個角色以及為何要新增這個角色,請參閱「將 Pub/Sub 發布者角色新增至 Pub/Sub 服務帳戶」一文。
前往 Google Cloud 控制台的「IAM」頁面。
按一下「包含 Google提供的角色授予項目」核取方塊。
找出格式為
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
的服務帳戶。針對這個服務帳戶,按一下「Edit Principal」(編輯主體) 按鈕。
如有需要,請按一下「新增其他角色」。
搜尋並點選「服務帳戶憑證建立者角色」圖示 (
roles/iam.serviceAccountTokenCreator
)。按一下 [儲存]。
在 Confluent Cloud 中建立身分識別資訊提供者
如要向 Confluent Cloud 驗證,Google Cloud 服務帳戶需要一個身分池。您必須先在 Confluent Cloud 中建立識別資訊提供者。
如要進一步瞭解如何在 Confluent Cloud 中建立 ID 提供者,請參閱「新增 OAuth/OIDC ID 提供者」頁面。
在選單中,按一下「帳戶與存取權」。
按一下「Workload identities」。
按一下「新增提供者」。
依序點選「OAuth/OIDC」和「下一步」。
依序點選「其他 OIDC 提供者」和「下一步」。
請提供身分識別提供者的名稱和用途說明。
按一下「顯示進階設定」。
在「Issuer URI」欄位中輸入
https://accounts.google.com
。在「JWKS URI」欄位中輸入
https://www.googleapis.com/oauth2/v3/certs
。按一下「驗證並儲存」。
在 Confluent Cloud 中建立身分集區,並授予適當的角色
您必須在身分設定檔下方建立身分資料集區,並授予必要的角色,讓 Pub/Sub 服務帳戶能夠驗證 Confluent Cloud Kafka 主題並讀取該主題。
請先確認您已在 Confluent Cloud 中建立叢集,再繼續建立身分資料集區。
如要進一步瞭解如何建立身分驗證集區,請參閱「使用身分驗證集區與 OAuth/OIDC 身分驗證提供者」頁面。
在選單中,按一下「帳戶與存取權」。
按一下「Workload identities」。
按一下您在「在 Confluent Cloud 中建立身分識別資訊提供者」一文中建立的身分識別資訊提供者。
按一下「新增集區」。
提供身分資料集合的名稱和說明。
將「Identity claim」設為
claims
。在「設定篩選條件」下方,點選「進階」分頁標籤。請輸入以下程式碼:
claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
將
<SERVICE_ACCOUNT_UNIQUE_ID>
替換為您在「記錄服務帳戶專屬 ID」一節中找到的服務帳戶專屬 ID。點選「下一步」。
按一下「新增權限」。然後點選「下一步」。
在相關叢集中,按一下「新增角色指派」。
按一下「操作員」角色,然後點選「新增」。
這個角色會授予 Pub/Sub 權限。服務帳戶可存取叢集,其中包含要擷取至 Pub/Sub 的 Confluent Kafka 主題。
在叢集下方,按一下「主題」。然後按一下「新增角色指派」。
選取「DeveloperRead」DeveloperRead角色。
按一下適當的選項,然後指定主題或前置字元。例如「特定主題」、「前置字串規則」或「所有主題」。
按一下「新增」。
點選「下一步」。
按一下「驗證並儲存」。
將 Pub/Sub 發布者角色新增至 Pub/Sub 授權對象
如要啟用發布功能,您必須為 Pub/Sub 服務帳戶指派發布者角色,讓 Pub/Sub 能夠發布至 Confluent Cloud 匯入主題。
啟用從所有主題發布內容的權限
如果您尚未建立任何 Confluent Cloud 匯入主題,請使用這個方法。
前往 Google Cloud 控制台的「IAM」頁面。
按一下「包含 Google提供的角色授予項目」核取方塊。
找出格式為
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
的服務帳戶。針對這個服務帳戶,按一下「Edit Principal」(編輯主體) 按鈕。
如有需要,請按一下「新增其他角色」。
搜尋並點選「Pub/Sub 發布者角色」 (
roles/pubsub.publisher
)。按一下 [儲存]。
啟用單一主題的發布功能
只有在 Confluent Cloud 匯入主題已存在時,才使用這個方法。
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
執行
gcloud pubsub topics add-iam-policy-binding
指令:gcloud pubsub topics add-iam-policy-binding TOPIC_ID \ --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-puiam.gserviceaccount.combsub." \ --role="roles/pubsub.publisher"
更改下列內容:
TOPIC_ID
:Confluent Cloud 匯入主題的主題 ID。PROJECT_NUMBER
:專案編號。如要查看專案編號,請參閱「識別專案」。
將服務帳戶使用者角色新增至服務帳戶
「服務帳戶使用者」角色 (roles/iam.serviceAccountUser
) 包含 iam.serviceAccounts.actAs
權限,可讓主體將服務帳戶連結至 Confluent Cloud 匯入主題的擷取設定,並使用該服務帳戶建立聯合身分。
前往 Google Cloud 控制台的「IAM」頁面。
針對發出建立或更新主題呼叫的主體,按一下「Edit Principal」按鈕。
如有需要,請按一下「新增其他角色」。
搜尋並點選「服務帳戶使用者角色」(
roles/iam.serviceAccountUser
)。按一下 [儲存]。
使用 Confluent Cloud 匯入主題
您可以建立新的匯入主題,或編輯現有主題。
注意事項
即使快速依序建立主題和訂閱項目,也可能導致資料遺失。在訂閱前,您可以短暫地查看該主題。如果在這個時間內有任何資料傳送至主題,就會遺失。請先建立主題、建立訂閱項目,然後再將主題轉換為匯入主題,這樣就能確保在匯入程序中不會遺漏任何訊息。
如果您需要重新建立現有匯入主題的 Kafka 主題,且兩者名稱相同,則不可以只刪除 Kafka 主題並重新建立。這項操作可能會使 Pub/Sub 的偏移管理無效,進而導致資料遺失。如要減輕這種情況,請按照下列步驟操作:
- 刪除 Pub/Sub 匯入主題。
- 刪除 Kafka 主題。
- 建立 Kafka 主題。
- 建立 Pub/Sub 匯入主題。
系統一律會從 最早的偏移量讀取 Confluent Cloud Kafka 主題的資料。
建立 Confluent Cloud 匯入主題
如要進一步瞭解與主題相關聯的屬性,請參閱「主題的屬性」。
請確認你已完成下列程序:
如要建立 Confluent Cloud 匯入主題,請按照下列步驟操作:
控制台
前往 Google Cloud 控制台的「Topics」(主題) 頁面。
按一下「建立主題」。
在「主題 ID」欄位中,輸入匯入主題的 ID。
如要進一步瞭解主題命名方式,請參閱命名規範。
選取「Add a default subscription」。
選取「啟用擷取」。
在「擷取來源」中,選取「Confluent Cloud」。
輸入下列詳細資訊:
Bootstrap 伺服器:叢集的 Bootstrap 伺服器,其中包含要擷取至 Pub/Sub 的 Kafka 主題。格式如下:
hostname:port
。叢集 ID:包含您要擷取至 Pub/Sub 的 Kafka 主題的叢集 ID。
主題:您要擷取至 Pub/Sub 的 Kafka 主題名稱。
身分集區 ID:用於透過 Confluent Cloud 進行驗證的身分集區的集區 ID。
服務帳戶:您在在 Google Cloud 中建立服務帳戶時建立的服務帳戶。
按一下「建立主題」。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
執行
gcloud pubsub topics create
指令:gcloud pubsub topics create TOPIC_ID \ --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \ --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \ --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \ --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \ --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
更改下列內容:
TOPIC_ID
:Pub/Sub 主題的名稱或 ID。CONFLUENT_BOOTSTRAP_SERVER
:叢集的 Bootstrap 伺服器,其中包含要擷取至 Pub/Sub 的 Kafka 主題。格式如下:hostname:port
。CONFLUENT_CLUSTER_ID
:叢集 ID,其中包含要擷取至 Pub/Sub 的 Kafka 主題。CONFLUENT_TOPIC
:您要擷取至 Pub/Sub 的 Kafka 主題名稱。CONFLUENT_IDENTITY_POOL_ID
:用於與 Confluent Cloud 進行驗證的身分集區的集區 ID。PUBSUB_SERVICE_ACCOUNT
:您在「在 Google Cloud 中建立服務帳戶」中建立的服務帳戶。
Go
在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Go。詳情請參閱 Pub/Sub Go API 參考說明文件。
如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。
Java
在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Java。詳情請參閱 Pub/Sub Java API 參考說明文件。
如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。
Node.js
在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js。詳情請參閱 Pub/Sub Node.js API 參考說明文件。
如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。
Python
在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Python。詳情請參閱 Pub/Sub Python API 參考說明文件。
如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。
C++
在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 C++。詳情請參閱 Pub/Sub C++ API 參考說明文件。
如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。
Node.js (TypeScript)
在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。 詳情請參閱 Pub/Sub Node.js API 參考說明文件。
如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。
如果發生問題,請參閱「排解 Confluent Cloud 匯入主題的問題」。
編輯 Confluent Cloud Hubs 匯入主題
如要編輯 Confluent Cloud 匯入主題的擷取資料來源設定,請按照下列步驟操作:
控制台
前往 Google Cloud 控制台的「Topics」頁面。
按一下 Confluent Cloud 匯入主題。
在主題詳細資料頁面中,按一下「編輯」。
更新要變更的欄位。
按一下「更新」。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
為避免遺失匯入主題的設定,請務必在每次更新主題時都加入所有設定。如果您未提供任何內容,Pub/Sub 會將設定重設為原始預設值。
使用下列範例中提到的所有標記,執行
gcloud pubsub topics update
指令:gcloud pubsub topics update TOPIC_ID \ --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \ --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \ --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \ --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \ --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
更改下列內容:
TOPIC_ID
:Pub/Sub 主題的名稱或 ID。CONFLUENT_BOOTSTRAP_SERVER
:叢集的 Bootstrap 伺服器,其中包含要擷取至 Pub/Sub 的 Kafka 主題。格式如下:hostname:port
。CONFLUENT_CLUSTER_ID
:叢集 ID,其中包含要擷取至 Pub/Sub 的 Kafka 主題CONFLUENT_TOPIC
:您要擷取至 Pub/Sub 的 Kafka 主題名稱。CONFLUENT_IDENTITY_POOL_ID
:用於驗證 Confluent Cloud 的身分驗證集區 ID。CONFLUENT_IDENTITY_POOL_ID
:您在「在 Google Cloud 中建立服務帳戶」中建立的服務帳戶。
配額與限制
匯入主題的發布者傳送量受限於主題的發布配額。詳情請參閱「Pub/Sub 配額和限制」。
後續步驟
為主題選擇訂閱類型。
瞭解如何將訊息發布至主題。
使用 gcloud CLI、REST API 或用戶端程式庫建立或修改主題。