為主題建立結構定義

本文說明如何為 Pub/Sub 主題建立結構定義。

事前準備

建立結構定義前,請先完成下列步驟:

必要角色和權限

如要取得建立及管理結構定義所需的權限,請要求管理員為您授予專案的 Pub/Sub 編輯者 (roles/pubsub.editor) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備建立和管理結構定義所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

如要建立及管理結構定義,必須具備下列權限:

  • 建立結構定義: pubsub.schemas.create
  • 將結構定義附加至主題: pubsub.schemas.attach
  • 提交結構定義修訂版本: pubsub.schemas.commit
  • 刪除結構定義或結構定義修訂版本: pubsub.schemas.delete
  • 取得結構定義或結構定義修訂版本: pubsub.schemas.get
  • 清單結構定義: pubsub.schemas.list
  • 列出結構定義修訂版本: pubsub.schemas.listRevisions
  • 回退結構定義: pubsub.schemas.rollback
  • 驗證訊息: pubsub.schemas.validate
  • 取得結構定義的身分與存取權管理政策: pubsub.schemas.getIamPolicy
  • 設定結構定義的 IAM 政策 pubsub.schemas.setIamPolicy

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以將角色和權限授予主體,例如使用者、群組、網域或服務帳戶。您可以在一個專案中建立結構定義,並將其附加至位於其他專案的主題。請確認您具備每個專案的必要權限。

建立結構定義

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 用戶端程式庫建立結構定義。

建立結構定義前,請先詳閱結構定義的重要資訊

控制台

如要建立結構定義,請按照下列步驟操作:

  1. 在 Google Cloud 控制台中,前往「Pub/Sub 結構定義」頁面。

    前往「結構定義」

  2. 按一下「建立結構定義」

  3. 在「架構 ID」欄位中,輸入架構的 ID。

    如要瞭解命名結構定義的規範,請參閱「命名主題、訂閱或快照的規範」。

  4. 在「結構定義類型」中,選取 Avro 或 Protocol Buffer。

    進一步瞭解結構定義類型

  5. 在「Schema definition」欄位中,輸入結構定義的 Avro 或 Protocol Buffer 定義。

    例如,以下是 Avro 中的結構定義範例。

    {
      "type": "record",
      "name": "Avro",
      "fields": [
        {
          "name": "ProductName",
          "type": "string",
          "default": ""
        },
        {
          "name": "SKU",
          "type": "int",
          "default": 0
        },
        {
          "name": "InStock",
          "type": "boolean",
          "default": false
        }
      ]
    }
    
    
  6. 選用步驟:點選「驗證定義」,檢查結構定義是否正確。

    驗證檢查不會檢查結構定義與要發布的訊息的相容性。請在下一個步驟中測試訊息。

  7. 選用:您可以測試是否已發布含有正確結構定義的訊息。

    1. 按一下「測試訊息」

    2. 在「Test message」視窗中,選取「Message encoding」類型。

    3. 在「郵件內文」中輸入測試訊息。

    4. 按一下「Test」

      舉例來說,以下是測試結構定義的範例訊息。在本範例中,請選取「訊息編碼」JSON

      {"ProductName":"GreenOnions", "SKU":34543, "InStock":true}
      
    5. 關閉測試訊息頁面。

  8. 按一下「建立」即可儲存結構定義。

gcloud

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

其中:

  • SCHEMA_TYPE 可以是 avroprotocol-buffer
  • SCHEMA_DEFINITIONstring,其中包含結構定義,並根據所選的結構類型設定格式。

您也可以在檔案中指定結構定義:

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

其中:

  • SCHEMA_TYPE 可以是 avroprotocol-buffer
  • SCHEMA_DEFINITION_FILEstring,其中包含結構定義檔案的路徑,並根據所選的結構類型設定格式。

REST

如要建立結構定義,請傳送以下 POST 要求:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

指定以下要求主體欄位:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
}

其中:

  • SCHEMA_TYPE 可以是 avroprotocol-buffer
  • SCHEMA_DEFINITION 是包含結構定義的字串,格式會根據所選的結構類型而定。

回應主體應包含結構定義資源的 JSON 表示法。例如:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

其中:

  • REVISION_ID 是伺服器為修訂版本產生的 ID。
  • REVISION_CREATE_TIME 是修訂版本建立時的 ISO 8601 時間戳記。

C++

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C++ 環境。詳情請參閱 Pub/Sub C++ API 參考說明文件

Avro

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CreateSchemaRequest request;
  request.set_parent(google::cloud::Project(project_id).FullName());
  request.set_schema_id(schema_id);
  request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CreateSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

Proto

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CreateSchemaRequest request;
  request.set_parent(google::cloud::Project(project_id).FullName());
  request.set_schema_id(schema_id);
  request.mutable_schema()->set_type(
      google::pubsub::v1::Schema::PROTOCOL_BUFFER);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CreateSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();
  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

C#

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C# 環境。詳情請參閱 Pub/Sub C# API 參考說明文件

Avro


using Google.Api.Gax.ResourceNames;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateAvroSchemaSample
{
    public Schema CreateAvroSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            SchemaName = schemaName,
            Type = Schema.Types.Type.Avro,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            ParentAsProjectName = ProjectName.FromProject(projectId),
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}

Proto


using Google.Api.Gax.ResourceNames;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateProtoSchemaSample
{
    public Schema CreateProtoSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            SchemaName = schemaName,
            Type = Schema.Types.Type.ProtocolBuffer,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            ParentAsProjectName = ProjectName.FromProject(projectId),
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}

Go

在試用這個範例之前,請先按照 快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

Avro

import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
)

// createAvroSchema creates a schema resource from a JSON-formatted Avro schema file.
func createAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	avscSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaAvro,
		Definition: string(avscSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %w", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
)

// createProtoSchema creates a schema resource from a schema proto file.
func createProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	protoSource, err := os.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaProtocolBuffer,
		Definition: string(protoSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %w", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}

Java

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Java 環境。詳情請參閱 Pub/Sub Java API 參考說明文件

Avro


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    createAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema createAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    createProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema createProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
      return null;
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。詳情請參閱 Pub/Sub Node.js API 參考說明文件

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createAvroSchema(schemaNameOrId, avscFile) {
  const definition = fs.readFileSync(avscFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.Avro,
    definition,
  );

  const name = await schema.getName();
  console.log(`Schema ${name} created.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createProtoSchema(schemaNameOrId, protoFile) {
  const definition = fs.readFileSync(protoFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.ProtocolBuffer,
    definition,
  );

  const fullName = await schema.getName();
  console.log(`Schema ${fullName} created.`);
}

Node.js

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。詳情請參閱 Pub/Sub Node.js API 參考說明文件

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createAvroSchema(schemaNameOrId: string, avscFile: string) {
  const definition: string = fs.readFileSync(avscFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.Avro,
    definition,
  );

  const name = await schema.getName();
  console.log(`Schema ${name} created.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createProtoSchema(schemaNameOrId: string, protoFile: string) {
  const definition: string = fs.readFileSync(protoFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.ProtocolBuffer,
    definition,
  );

  const fullName: string = await schema.getName();
  console.log(`Schema ${fullName} created.`);
}

PHP

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的 PHP 設定說明進行操作。詳情請參閱 Pub/Sub PHP API 參考說明文件

Avro

use Google\Cloud\PubSub\PubSubClient;

/**
 * Create a Schema with an AVRO definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $avscFile
 */
function create_avro_schema(string $projectId, string $schemaId, string $avscFile): void
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = (string) file_get_contents($avscFile);
    $schema = $pubsub->createSchema($schemaId, 'AVRO', $definition);

    printf('Schema %s created.', $schema->name());
}

Proto

use Google\Cloud\PubSub\PubSubClient;

/**
 * Create a Schema with an Protocol Buffer definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $protoFile
 */
function create_proto_schema($projectId, $schemaId, $protoFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = (string) file_get_contents($protoFile);
    $schema = $pubsub->createSchema($schemaId, 'PROTOCOL_BUFFER', $definition);

    printf('Schema %s created.', $schema->name());
}

Python

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Python 環境。詳情請參閱 Pub/Sub Python API 參考說明文件

Avro

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

project_path = f"projects/{project_id}"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using an Avro schema file:\n{result}")
    return result
except AlreadyExists:
    print(f"{schema_id} already exists.")

Proto

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

project_path = f"projects/{project_id}"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using a protobuf schema file:\n{result}")
    return result
except AlreadyExists:
    print(f"{schema_id} already exists.")

Ruby

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

Avro

# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

pubsub = Google::Cloud::Pubsub.new

definition = File.read avsc_file
schema = pubsub.create_schema schema_id, :avro, definition

puts "Schema #{schema.name} created."

Proto

# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

pubsub = Google::Cloud::Pubsub.new

definition = File.read proto_file
schema = pubsub.create_schema schema_id, :protocol_buffer, definition

puts "Schema #{schema.name} created."

建立結構定義後,您可以在「結構定義」頁面中查看結構定義的詳細資料

您可以將結構定義與主題建立關聯

後續步驟