针对 Pub/Sub 推送订阅的载荷解封

构建 Pub/Sub 系统时,载荷解封装可帮助您连接到不遵循标准 Pub/Sub 推送端点实现的所有系统要求的其他系统。

载荷解封装的一些可能用例如下:

  • 您不想为 HTTP 推送端点编写特定于 Pub/Sub 的消息解析代码。
  • 您希望以 HTTP 标头的形式接收 Pub/Sub 消息元数据,而不是在 HTTP POST 正文中接收元数据。
  • 您想发送 Pub/Sub 消息并排除 Pub/Sub 元数据,例如在向第三方 API 发送数据时。

载荷解封的工作原理

载荷解封是一项功能,可剥离 Pub/Sub 消息中的所有消息元数据(消息数据除外)。通过发送原始消息数据,订阅者无需遵守 Pub/Sub 的任何系统要求,即可处理消息。

  • 通过载荷展开,消息数据会直接作为 HTTP 正文传送。
  • 如果不解封载荷,Pub/Sub 会传送包含多个消息元数据字段和消息数据字段的 JSON 对象。在这种情况下,必须解析 JSON 以检索消息数据,然后进行 base64 解码。

写入元数据

启用载荷解封后,您可以使用写入元数据选项,将之前移除的消息元数据添加到请求标头中。

  • 已启用元数据写入权限。将消息元数据重新添加到请求标头中。还会传送已解码的原始消息数据。
  • 写入元数据已停用。仅传送已解码的原始消息数据。

写入元数据通过 Pub/Sub、Google Cloud CLI 参数 --push-no-wrapper-write-metadata 和 API 属性 NoWrapper 公开。默认情况下,此值为 null。

准备工作

封装和未封装消息的示例

以下示例说明了发送封装的 HTTP 消息与未封装的 HTTP 消息之间的区别。在这些示例中,消息数据包含字符串 {"status": "Hello there"}

在本示例中,系统会创建一个启用了载荷解封装功能的订阅,并将消息发布到 mytopic。它使用值为 some-key 的排序键,并且媒体类型声明为 application/json

gcloud pubsub topics publish mytopic
   --message='{"status": "Hello there"}'
   --ordering-key="some-key"
   --attribute "Content-Type=application/json"

以下部分介绍了封装消息和未封装消息之间的区别。

换行消息

以下示例展示了标准的 Pub/Sub 封装消息。在本例中,未启用载荷解封装。

发布 推送端点接收
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
Content-Length: 361
Content-Type: application/json
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{
  "message": {
      "attributes": {
          "Content-Type": "application/json"
      },
      "data": "eyJzdGF0dXMiOiAiSGVsbG8gdGhlcmUifQ==", //  Base64 - {"status": "Hello there"}
      "messageId": "2070443601311540",
      "message_id": "2070443601311540",
      "publishTime": "2021-02-26T19:13:55.749Z",
      "publish_time": "2021-02-26T19:13:55.749Z"
  },
  "subscription": "projects/myproject/..."
}

已停用写入元数据的解封消息

以下示例展示了已停用“写入元数据”选项的已取消封装消息。在本例中,不包含 x-goog-pubsub-* 标头和邮件属性

发布 推送端点接收
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
Content-Length: 25
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

启用了写入元数据的未封装消息

以下示例展示了启用了“写入元数据”选项的已取消封装消息。在本例中,包含 x-goog-pubsub-* 标头和邮件属性

发布 推送端点接收
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
x-goog-pubsub-subscription-name: "projects/myproject/..."
x-goog-pubsub-message-id: "2070443601311540"
x-goog-pubsub-publish-time: "2021-02-26T19:13:55.749Z"
x-goog-pubsub-ordering-key: "some-key"
Content-Type: application/json
Content-Length: 12
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

配置载荷解封

您可以使用 Google Cloud 控制台的订阅详情页面、Google Cloud CLI 或客户端库为订阅启用载荷解封装推送传送。

控制台

  1. 在 Google Cloud 控制台中,进入订阅页面。

    打开 Pub/Sub 订阅

  2. 点击创建订阅

  3. 订阅 ID 字段中,输入名称。

    如需了解如何命名订阅,请参阅主题或订阅命名指南

  4. 从下拉菜单中选择一个主题。订阅将接收来自该主题的消息。

  5. 对于传送类型,选择推送

  6. 如需启用载荷解封,请选择启用载荷解封

  7. (可选)如需在请求标头中保留消息的元数据,请选择写入元数据。您必须启用此选项,才能为邮件设置 Content-Type 标头

  8. 指定端点网址。

  9. 保留所有其他默认值。

  10. 点击创建

gcloud

如需配置包含标准 HTTP 标头的载荷解封装订阅,请运行以下 gcloud pubsub subscriptions create 命令:

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper

替换以下内容:

  • SUBSCRIPTION:拉取订阅的名称或 ID。
  • TOPIC:主题的 ID。
  • PUSH_ENDPOINT:要用作此订阅的端点的网址。例如 https://myproject.appspot.com/myhandler
  • --push-no-wrapper:直接将消息数据作为 HTTP 正文传递。

如需配置包含载荷解封装的订阅并控制 x-goog-pubsub-* 标头的使用,请运行以下命令:

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper \
  --push-no-wrapper-write-metadata
  • --push-no-wrapper-write-metadata:如果为 true,则将 Pub/Sub 消息元数据写入 HTTP 请求的 x-goog-pubsub-<KEY>:<VAL> 标头。将 Pub/Sub 消息属性写入 HTTP 请求的 <KEY>:<VAL> 标头。

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# endpoint = "https://my-test-project.appspot.com/push"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

no_wrapper = pubsub_v1.types.PushConfig.NoWrapper(write_metadata=True)
push_config = pubsub_v1.types.PushConfig(
    push_endpoint=endpoint, no_wrapper=no_wrapper
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "push_config": push_config,
        }
    )

print(f"Push no wrapper subscription created: {subscription}.")
print(f"Endpoint for subscription is: {endpoint}")
print(f"No wrapper configuration for subscription is: {no_wrapper}")

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

/*
 * Copyright 2016 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package pubsub;


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.PushConfig.NoWrapper;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateUnwrappedPushSubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";
    String pushEndpoint = "https://my-test-project.appspot.com/push";

    createPushSubscriptionExample(projectId, subscriptionId, topicId, pushEndpoint);
  }

  public static void createPushSubscriptionExample(
      String projectId, String subscriptionId, String topicId, String pushEndpoint)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);
      SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
      NoWrapper noWrapper =
          NoWrapper.newBuilder()
              // Determines if message metadata is added to the HTTP headers of
              // the delivered message.
              .setWriteMetadata(true)
              .build();
      PushConfig pushConfig =
          PushConfig.newBuilder().setPushEndpoint(pushEndpoint).setNoWrapper(noWrapper).build();

      // Create a push subscription with default acknowledgement deadline of 10 seconds.
      // Messages not successfully acknowledged within 10 seconds will get resent by the server.
      Subscription subscription =
          subscriptionAdminClient.createSubscription(subscriptionName, topicName, pushConfig, 10);
      System.out.println("Created push subscription: " + subscription.getName());
    }
  }
}

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& endpoint) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_push_config()->set_push_endpoint(endpoint);
  request.mutable_push_config()->mutable_no_wrapper()->set_write_metadata(
      true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createPushNoWrapperSubscription creates a push subscription where messages are delivered in the HTTP body.
func createPushNoWrapperSubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, endpoint string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// endpoint := "https://my-test-project.appspot.com/push"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:       topic,
		AckDeadline: 10 * time.Second,
		PushConfig: pubsub.PushConfig{
			Endpoint: endpoint,
			Wrapper: &pubsub.NoWrapper{
				// Determines if message metadata is added to the HTTP headers of
				// the delivered message.
				WriteMetadata: true,
			},
		},
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created push no wrapper subscription: %v\n", sub)
	return nil
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint,
  topicNameOrId,
  subscriptionNameOrId
) {
  const options = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint: string,
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  const options: CreateSubscriptionOptions = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

在邮件中设置 content-type 标头

启用载荷解封装后,Pub/Sub 不会在请求中自动设置媒体类型标头字段。如果您未明确设置 Content-Type 标头字段,处理您请求的 Web 服务器可能会设置默认值 application/octet-stream,或者以意外的方式解读请求。

如果您需要 Content-Type 标头,请务必在发布时为每个要发布的消息明确声明该标头。为此,您必须先启用写入元数据提供的示例中显示了启用写入元数据的结果。

后续步骤