为主题创建架构

本文档介绍了如何为 Pub/Sub 主题创建架构。

准备工作

在创建架构之前,请完成以下操作:

所需的角色和权限

如需获得创建和管理架构所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建和管理架构所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建和管理架构,需要具备以下权限:

  • 创建架构: pubsub.schemas.create
  • 将架构附加到主题: pubsub.schemas.attach
  • 提交架构修订版本: pubsub.schemas.commit
  • 删除架构或架构修订版本: pubsub.schemas.delete
  • 获取架构或架构修订版本: pubsub.schemas.get
  • 列表架构: pubsub.schemas.list
  • 列出架构修订版本: pubsub.schemas.listRevisions
  • 回滚架构: pubsub.schemas.rollback
  • 验证消息: pubsub.schemas.validate
  • 获取架构的 IAM 政策: pubsub.schemas.getIamPolicy
  • 为架构配置 IAM 政策 pubsub.schemas.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以向主账号(例如用户、群组、网域或服务账号)授予角色和权限。您可以在一个项目中创建架构,并将其附加到位于其他项目中的主题。确保您对每个项目拥有所需的权限。

创建架构

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库创建架构。

在创建架构之前,请查看有关架构的重要信息

控制台

如需创建架构,请按照以下步骤操作:

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 架构页面。

    前往“架构”

  2. 点击创建架构

  3. 架构 ID 字段中,输入架构的 ID。

    如需了解架构命名准则,请参阅主题、订阅或快照命名指南

  4. 对于架构类型,请选择 Avro 或协议缓冲区。

    详细了解架构类型

  5. 架构定义字段中,为您的架构输入 Avro 或协议缓冲区定义。

    例如,下面是 Avro 中的架构示例。

    {
      "type": "record",
      "name": "Avro",
      "fields": [
        {
          "name": "ProductName",
          "type": "string",
          "default": ""
        },
        {
          "name": "SKU",
          "type": "int",
          "default": 0
        },
        {
          "name": "InStock",
          "type": "boolean",
          "default": false
        }
      ]
    }
    
    
  6. 可选:点击验证定义,检查架构定义是否正确。

    验证检查不会检查架构与要发布的消息的兼容性。请在下一步中测试消息。

  7. 可选:您可以测试是否发布了架构正确的消息。

    1. 点击测试消息

    2. 测试邮件窗口中,选择一种邮件编码

    3. 消息正文中,输入测试消息。

    4. 点击测试

      例如,下面是测试架构的消息示例。 在此示例中,选择 Message encoding(消息编码)为 JSON

      {"ProductName":"GreenOnions", "SKU":34543, "InStock":true}
      
    5. 退出“测试消息”页面。

  8. 点击创建以保存架构。

gcloud

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

其中:

  • SCHEMA_TYPEavroprotocol-buffer
  • SCHEMA_DEFINITION 是包含架构定义的 string,系统会根据所选架构类型设置其格式。

您还可以在文件中指定架构定义:

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

其中:

  • SCHEMA_TYPEavroprotocol-buffer
  • SCHEMA_DEFINITION_FILE 是包含架构定义文件路径的 string,系统会根据所选架构类型设置其格式。

REST

要创建架构,请发送如下所示的 POST 请求:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

在请求正文中指定以下字段:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
}

其中:

  • SCHEMA_TYPEavroprotocol-buffer
  • SCHEMA_DEFINITION 是包含架构定义的字符串,系统会根据所选架构类型设置其格式。

响应正文应包含架构资源的 JSON 表示法。例如:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

其中:

  • REVISION_ID 是服务器为修订版本生成的 ID。
  • REVISION_CREATE_TIME 是修订版本的创建时间戳,采用 ISO 8601 格式。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

Avro

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CreateSchemaRequest request;
  request.set_parent(google::cloud::Project(project_id).FullName());
  request.set_schema_id(schema_id);
  request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CreateSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

Proto

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CreateSchemaRequest request;
  request.set_parent(google::cloud::Project(project_id).FullName());
  request.set_schema_id(schema_id);
  request.mutable_schema()->set_type(
      google::pubsub::v1::Schema::PROTOCOL_BUFFER);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CreateSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();
  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

Avro


using Google.Api.Gax.ResourceNames;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateAvroSchemaSample
{
    public Schema CreateAvroSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            SchemaName = schemaName,
            Type = Schema.Types.Type.Avro,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            ParentAsProjectName = ProjectName.FromProject(projectId),
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}

Proto


using Google.Api.Gax.ResourceNames;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateProtoSchemaSample
{
    public Schema CreateProtoSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            SchemaName = schemaName,
            Type = Schema.Types.Type.ProtocolBuffer,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            ParentAsProjectName = ProjectName.FromProject(projectId),
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

Avro

import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
)

// createAvroSchema creates a schema resource from a JSON-formatted Avro schema file.
func createAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	avscSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaAvro,
		Definition: string(avscSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %w", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
)

// createProtoSchema creates a schema resource from a schema proto file.
func createProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	protoSource, err := os.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaProtocolBuffer,
		Definition: string(protoSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %w", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

Avro


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    createAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema createAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    createProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema createProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
      return null;
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createAvroSchema(schemaNameOrId, avscFile) {
  const definition = fs.readFileSync(avscFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.Avro,
    definition
  );

  const name = await schema.getName();
  console.log(`Schema ${name} created.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createProtoSchema(schemaNameOrId, protoFile) {
  const definition = fs.readFileSync(protoFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.ProtocolBuffer,
    definition
  );

  const fullName = await schema.getName();
  console.log(`Schema ${fullName} created.`);
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createAvroSchema(schemaNameOrId: string, avscFile: string) {
  const definition: string = fs.readFileSync(avscFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.Avro,
    definition
  );

  const name = await schema.getName();
  console.log(`Schema ${name} created.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createProtoSchema(schemaNameOrId: string, protoFile: string) {
  const definition: string = fs.readFileSync(protoFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.ProtocolBuffer,
    definition
  );

  const fullName: string = await schema.getName();
  console.log(`Schema ${fullName} created.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

Avro

use Google\Cloud\PubSub\PubSubClient;

/**
 * Create a Schema with an AVRO definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $avscFile
 */
function create_avro_schema(string $projectId, string $schemaId, string $avscFile): void
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = (string) file_get_contents($avscFile);
    $schema = $pubsub->createSchema($schemaId, 'AVRO', $definition);

    printf('Schema %s created.', $schema->name());
}

Proto

use Google\Cloud\PubSub\PubSubClient;

/**
 * Create a Schema with an Protocol Buffer definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $protoFile
 */
function create_proto_schema($projectId, $schemaId, $protoFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = (string) file_get_contents($protoFile);
    $schema = $pubsub->createSchema($schemaId, 'PROTOCOL_BUFFER', $definition);

    printf('Schema %s created.', $schema->name());
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

Avro

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

project_path = f"projects/{project_id}"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using an Avro schema file:\n{result}")
    return result
except AlreadyExists:
    print(f"{schema_id} already exists.")

Proto

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

project_path = f"projects/{project_id}"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using a protobuf schema file:\n{result}")
    return result
except AlreadyExists:
    print(f"{schema_id} already exists.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

Avro

# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

pubsub = Google::Cloud::Pubsub.new

definition = File.read avsc_file
schema = pubsub.create_schema schema_id, :avro, definition

puts "Schema #{schema.name} created."

Proto

# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

pubsub = Google::Cloud::Pubsub.new

definition = File.read proto_file
schema = pubsub.create_schema schema_id, :protocol_buffer, definition

puts "Schema #{schema.name} created."

创建架构后,您可以在架构页面中查看架构的详细信息

您可以将架构与主题相关联

后续步骤