Eseguire il commit di una revisione dello schema

Questo documento mostra come eseguire il commit di una revisione dello schema per gli argomenti Pub/Sub.

Prima di iniziare

Ruoli e autorizzazioni richiesti

Per ottenere le autorizzazioni necessarie per eseguire il commit di una revisione dello schema e gestire gli schemi, chiedi all'amministratore di concederti il ruolo IAM Editor Pub/Sub (roles/pubsub.editor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per eseguire il commit di una revisione dello schema e gestire gli schemi. Per visualizzare le autorizzazioni esatte richieste, espandi la sezione Autorizzazioni richieste:

Autorizzazioni obbligatorie

Per eseguire il commit di una revisione dello schema e gestire gli schemi sono necessarie le seguenti autorizzazioni:

  • Crea schema: pubsub.schemas.create
  • Allega lo schema all'argomento: pubsub.schemas.attach
  • Esegui il commit di una revisione dello schema: pubsub.schemas.commit
  • Elimina uno schema o una revisione dello schema: pubsub.schemas.delete
  • Ottieni uno schema o le revisioni dello schema: pubsub.schemas.get
  • Elenca schemi: pubsub.schemas.list
  • Revisioni dello schema dell'elenco: pubsub.schemas.listRevisions
  • Esegui il rollback di uno schema: pubsub.schemas.rollback
  • Convalida un messaggio: pubsub.schemas.validate
  • Recupera il criterio IAM per uno schema: pubsub.schemas.getIamPolicy
  • Configura il criterio IAM per uno schema: pubsub.schemas.setIamPolicy

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Puoi concedere ruoli e autorizzazioni a entità come utenti, gruppi, domini o account di servizio. Puoi creare uno schema in un progetto e agganciarlo a un argomento in un altro progetto. Assicurati di disporre delle autorizzazioni necessarie per ogni progetto.

Rivedi uno schema

Puoi eseguire il commit di una revisione dello schema utilizzando la console Google Cloud, l'interfaccia alla gcloud CLI, l'API Pub/Sub o le librerie client di Cloud.

Di seguito sono riportate alcune linee guida per eseguire il commit di una revisione dello schema:

  • Puoi rivedere uno schema entro vincoli specifici:

    • Per gli schemi Protocol Buffer, puoi aggiungere o rimuovere campi facoltativi. Non puoi aggiungere o eliminare altri campi. Inoltre, non puoi modificare alcun campo esistente.

    • Per gli schemi Avro, consulta la documentazione di Avro per le regole relative alla risoluzione dello schema. Una nuova revisione deve seguire le regole come se fosse sia lo schema del lettore che lo schema dello scrittore.

    • Uno schema può avere un massimo di 20 revisioni contemporaneamente. Se superi il limite, elimina una revisione dello schema prima di crearne un'altra.

  • A ogni revisione è associato un ID revisione univoco. L'ID revisione è un UUID di otto caratteri generato automaticamente.

  • Quando aggiorni l'intervallo di revisione o la revisione di uno schema utilizzato per la convalida degli argomenti, potrebbero essere necessari alcuni minuti prima che le modifiche diventino effettive.

Console

Per creare una revisione dello schema:

  1. Nella console Google Cloud, vai alla pagina Schemi Pub/Sub.

    Vai a Schemi

  2. Fai clic sull'ID schema di uno schema esistente.

    Viene visualizzata la pagina Dettagli schema dello schema.

  3. Fai clic su Crea revisione.

    Viene visualizzata la pagina Crea revisione dello schema.

  4. Apporta le modifiche necessarie.

    Ad esempio, per lo schema di esempio in Avro che hai creato in Creare uno schema, puoi aggiungere un altro campo facoltativo chiamato Price come segue:

     {
       "type": "record",
       "name": "Avro",
       "fields": [
         {
           "name": "ProductName",
           "type": "string",
           "default": ""
         },
         {
           "name": "SKU",
           "type": "int",
           "default": 0
         },
         {
           "name": "InStock",
           "type": "boolean",
           "default": false
         },
         {
           "name": "Price",
           "type": "double",
           "default": "0.0"
         }
       ]
     }
    
  5. Fai clic su Convalida definizione per verificare che la definizione dello schema sia corretta.

  6. Puoi anche convalidare i messaggi per lo schema.

    1. Fai clic su Test messaggio per testare un messaggio di esempio.

    2. Nella finestra Test messaggio, seleziona un tipo di codifica del messaggio.

    3. In Test message (Messaggio di test), inserisci un messaggio di test.

      Ad esempio, di seguito è riportato un messaggio di esempio per lo schema di test. In questo esempio, seleziona Codifica dei messaggi come JSON.

      {"ProductName":"GreenOnions", "SKU":34543, "Price":12, "InStock":true}
      
    4. Fai clic su Test.

  7. Fai clic su Commit per salvare lo schema.

gcloud

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

Dove:

  • SCHEMA_TYPE è avro o protocol-buffer.
  • SCHEMA_DEFINITION è un string contenente la definizione dello schema, formattata in base al tipo di schema scelto.

Puoi anche specificare la definizione dello schema nel file:

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

Dove:

  • SCHEMA_TYPE è avro o protocol-buffer.
  • SCHEMA_DEFINITION_FILE è un string contenente il percorso del file con la definizione dello schema, formattato in base al tipo di schema scelto.

REST

Per eseguire il commit di una revisione dello schema, invia una richiesta POST come la seguente:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID:commit
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

Specifica i seguenti campi nel corpo della richiesta:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
  "name": SCHEMA_NAME
}

Dove:

  • SCHEMA_TYPE è AVRO o PROTOCOL_BUFFER.
  • SCHEMA_DEFINITION è una stringa contenente la definizione dello schema, formattata in base al tipo di schema scelto.
  • SCHEMA_NAME è il nome di uno schema esistente.

Il corpo della risposta deve contenere una rappresentazione JSON di una risorsa schema. Ad esempio:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

Dove:

  • REVISION_ID è l'ID generato dal server per la revisione.
  • REVISION_CREATE_TIME è il timestamp ISO 8601 in cui è stata creata la revisione.

Vai

Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

Avro

import (
	"context"
	"fmt"
	"io"
	"io/ioutil"

	"cloud.google.com/go/pubsub"
)

// commitAvroSchema commits a new avro schema revision to an existing schema.
func commitAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema-id"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read an Avro schema file formatted in JSON as a byte slice.
	avscSource, err := ioutil.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	config := pubsub.SchemaConfig{
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsub.SchemaAvro,
		Definition: string(avscSource),
	}
	s, err := client.CommitSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using an Avro schema: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"io/ioutil"

	"cloud.google.com/go/pubsub"
)

// commitProtoSchema commits a new proto schema revision to an existing schema.
func commitProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read a proto file as a byte slice.
	protoSource, err := ioutil.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	config := pubsub.SchemaConfig{
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsub.SchemaProtocolBuffer,
		Definition: string(protoSource),
	}
	s, err := client.CommitSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using a protobuf schema: %#v\n", s)
	return nil
}

C++

Prima di provare questo esempio, segui le istruzioni di configurazione C++ riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

Avro

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Proto

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(
      google::pubsub::v1::Schema::PROTOCOL_BUFFER);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.

Avro


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    commitAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema commitAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build());

      System.out.println("Committed a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    commitProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema commitProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build());

      System.out.println("Committed a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub per Python.

Avro

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using an Avro schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Proto

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using a protobuf schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId, avscFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId, protoFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId: string, avscFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId: string, protoFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Dopo aver eseguito il commit di una revisione dello schema, puoi visualizzare i dettagli della nuova revisione nella pagina Schemi.

Passaggi successivi