编写和响应 Pub/Sub 消息

区域 ID

REGION_ID 是 Google 根据您在创建应用时选择的区域分配的缩写代码。此代码不对应于国家/地区或省,尽管某些区域 ID 可能类似于常用国家/地区代码和省代码。对于 2020 年 2 月以后创建的应用,REGION_ID.r 包含在 App Engine 网址中。对于在此日期之前创建的现有应用,网址中的区域 ID 是可选的。

详细了解区域 ID

Pub/Sub 可在应用之间提供可靠的多对多异步消息传递服务。发布者应用可以向某一主题发送消息,其他应用可以订阅该主题以接收消息。

本文档介绍了如何使用 Cloud 客户端库在 App Engine 应用中发送和接收 Pub/Sub 消息。

前提条件

克隆示例应用

将示例应用复制到本地机器,然后导航到 pubsub 目录:

Go

git clone https://github.com/GoogleCloudPlatform/golang-samples.git
cd golang-samples/appengine_flexible/pubsub

Java

git clone https://github.com/GoogleCloudPlatform/java-docs-samples
cd java-docs-samples/flexible/java-11/pubsub/

Node.js

git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples
cd nodejs-docs-samples/appengine/pubsub

PHP

git clone https://github.com/GoogleCloudPlatform/php-docs-samples.git
cd php-docs-samples/pubsub

Python

git clone https://github.com/GoogleCloudPlatform/python-docs-samples
cd python-docs-samples/appengine/flexible/pubsub

Ruby

git clone https://github.com/GoogleCloudPlatform/ruby-docs-samples
cd ruby-docs-samples/appengine/flexible/pubsub/

.NET

git clone  https://github.com/GoogleCloudPlatform/dotnet-docs-samples
cd dotnet-docs-samples/appengine/flexible/Pubsub/Pubsub.Sample

创建主题和订阅

创建主题和订阅,并在订阅中指定 Pub/Sub 服务器应发送请求的目标端点:

Go

# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

用随机密钥令牌替换 YOUR_TOKEN。推送端点使用它来验证请求。

如需将 Pub/Sub 与身份验证搭配使用,请创建另一个订阅:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

YOUR-SERVICE-ACCOUNT-EMAIL 替换为您的服务账号电子邮件地址。

Java

# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

用随机密钥令牌替换 YOUR_TOKEN。推送端点使用它来验证请求。

如需将 Pub/Sub 与身份验证搭配使用,请创建另一个订阅:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

YOUR-SERVICE-ACCOUNT-EMAIL 替换为您的服务账号电子邮件地址。

Node.js

# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

用随机密钥令牌替换 YOUR_TOKEN。推送端点使用它来验证请求。

如需将 Pub/Sub 与身份验证搭配使用,请创建另一个订阅:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

YOUR-SERVICE-ACCOUNT-EMAIL 替换为您的服务账号电子邮件地址。

PHP

# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

用随机密钥令牌替换 YOUR_TOKEN。推送端点使用它来验证请求。

如需将 Pub/Sub 与身份验证搭配使用,请创建另一个订阅:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

YOUR-SERVICE-ACCOUNT-EMAIL 替换为您的服务账号电子邮件地址。

Python

# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

用随机密钥令牌替换 YOUR_TOKEN。推送端点使用它来验证请求。

如需将 Pub/Sub 与身份验证搭配使用,请创建另一个订阅:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

YOUR-SERVICE-ACCOUNT-EMAIL 替换为您的服务账号电子邮件地址。

Ruby

# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

用随机密钥令牌替换 YOUR_TOKEN。推送端点使用它来验证请求。

如需将 Pub/Sub 与身份验证搭配使用,请创建另一个订阅:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

YOUR-SERVICE-ACCOUNT-EMAIL 替换为您的服务账号电子邮件地址。

.NET

# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

用随机密钥令牌替换 YOUR_TOKEN。推送端点使用它来验证请求。

如需将 Pub/Sub 与身份验证搭配使用,请创建另一个订阅:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

YOUR-SERVICE-ACCOUNT-EMAIL 替换为您的服务账号电子邮件地址。

设置环境变量

Go

修改 app.yaml 文件,以为您的主题和验证令牌设置环境变量:

env_variables:
  PUBSUB_TOPIC: your-topic
  # This token is used to verify that requests originate from your
  # application. It can be any sufficiently random string.
  PUBSUB_VERIFICATION_TOKEN: your-token

Java

修改 app.yaml 文件,以为您的主题和验证令牌设置环境变量:

env_variables:
  PUBSUB_TOPIC: <your-topic-name>
  # This token is used to verify that requests originate from your
  # application. It can be any sufficiently random string.
  PUBSUB_VERIFICATION_TOKEN: <your-verification-token>

Node.js

修改 app.yaml 文件,以为您的主题和验证令牌设置环境变量:

env_variables:
  PUBSUB_TOPIC: YOUR_TOPIC_NAME
  # This token is used to verify that requests originate from your
  # application. It can be any sufficiently random string.
  PUBSUB_VERIFICATION_TOKEN: YOUR_VERIFICATION_TOKEN

PHP

修改 index.php 文件,为您的主题和订阅设置环境变量:

$container->set('topic', 'php-example-topic');
$container->set('subscription', 'php-example-subscription');

Python

修改 app.yaml 文件,为您的项目 ID、主题和验证令牌设置环境变量:

env_variables:
    PUBSUB_TOPIC: your-topic
    # This token is used to verify that requests originate from your
    # application. It can be any sufficiently random string.
    PUBSUB_VERIFICATION_TOKEN: 1234abc

Ruby

修改 app.yaml 文件,为您的项目 ID、主题和验证令牌设置环境变量:

env_variables:
    PUBSUB_TOPIC: <pubsub-topic-name>
    # This token is used to verify that requests originate from your
    # application. It can be any sufficiently random string.
    PUBSUB_VERIFICATION_TOKEN: <verification-token>

.NET

修改 app.yaml 文件,以为您的主题和验证令牌设置环境变量:

runtime: aspnetcore
env: flex

runtime_config:
  operating_system: ubuntu22

env_variables:
  TEST_PROJECT_ID: your-project-id
  TEST_VERIFICATION_TOKEN: your-token
  TEST_TOPIC_ID: your-topic
  TEST_SUBSCRIPTION_ID: your-sub
  TEST_AUTH_SUBSCRIPTION_ID: your-auth-sub
  TEST_SERVICE_ACCOUNT_EMAIL: your-service-account-email

代码审核

示例应用使用 Pub/Sub 客户端库

Go

示例应用使用您在 app.yaml 文件(PUBSUB_TOPICPUBSUB_VERIFICATION_TOKEN)中设置的环境变量进行配置。

此实例接收的消息存储在切片中:

messages   []string

pushHandler 函数接收推送的消息,验证令牌,并将消息添加到 messages 切片:


func pushHandler(w http.ResponseWriter, r *http.Request) {
	// Verify the token.
	if r.URL.Query().Get("token") != token {
		http.Error(w, "Bad token", http.StatusBadRequest)
		return
	}
	msg := &pushRequest{}
	if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
		http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
		return
	}

	messagesMu.Lock()
	defer messagesMu.Unlock()
	// Limit to ten.
	messages = append(messages, string(msg.Message.Data))
	if len(messages) > maxMessages {
		messages = messages[len(messages)-maxMessages:]
	}
}

publishHandler 函数向该主题发布新消息。


func publishHandler(w http.ResponseWriter, r *http.Request) {
	ctx := context.Background()

	msg := &pubsub.Message{
		Data: []byte(r.FormValue("payload")),
	}

	if _, err := topic.Publish(ctx, msg).Get(ctx); err != nil {
		http.Error(w, fmt.Sprintf("Could not publish message: %v", err), 500)
		return
	}

	fmt.Fprint(w, "Message published.")
}

Java

示例应用使用您在 app.yaml 文件中设置的值来配置环境变量。推送请求处理程序使用这些值来确认请求来自 Pub/Sub 且来源可信:

String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");

示例应用维护一个 Cloud Datastore 数据库实例以用于存储消息。PubSubPush Servlet 用于接收推送的消息并将其添加到 messageRepository 数据库实例中:

版本 11/17

@WebServlet(value = "/pubsub/push")
@MultipartConfig()
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (pubsubVerificationToken == null
        || pubsubVerificationToken.compareTo(req.getParameter("token")) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(HttpServletResponse.SC_OK);
    } catch (Exception e) {
      System.out.println(e);
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

版本 8

@WebServlet(value = "/pubsub/push")
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(HttpServletResponse.SC_OK);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

PubSubPublish Servlet 与 App Engine Web 应用交互,以发布新消息并显示收到的消息:

@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
public class PubSubPublish extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    Publisher publisher = this.publisher;
    try {
      String topicId = System.getenv("PUBSUB_TOPIC");
      // create a publisher on the topic
      if (publisher == null) {
        publisher = Publisher.newBuilder(
            ProjectTopicName.of(ServiceOptions.getDefaultProjectId(), topicId))
            .build();
      }
      // construct a pubsub message from the payload
      final String payload = req.getParameter("payload");
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

      publisher.publish(pubsubMessage);
      // redirect to home page
      resp.sendRedirect("/");
    } catch (Exception e) {
      resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
    }
  }

Node.js

示例应用使用您在 app.yaml 文件中设置的值来配置环境变量。推送请求处理程序使用这些值来确认请求来自 Pub/Sub 且来源可信:

// The following environment variables are set by the `app.yaml` file when
// running on App Engine, but will need to be manually set when running locally.
var PUBSUB_VERIFICATION_TOKEN = process.env.PUBSUB_VERIFICATION_TOKEN;
var pubsub = gcloud.pubsub({
    projectId: process.env.GOOGLE_CLOUD_PROJECT
});
var topic = pubsub.topic(process.env.PUBSUB_TOPIC);

示例应用维护一个全局列表以存储此实例接收的消息:

// List of all messages received by this instance
var messages = [];

此方法接收推送的消息并将其添加到 messages 全局列表中:

app.post('/pubsub/push', jsonBodyParser, (req, res) => {
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send();
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

此方法与 App Engine Web 应用交互,以发布新消息并显示收到的消息:

app.get('/', (req, res) => {
  res.render('index', {messages, tokens, claims});
});

app.post('/', formBodyParser, async (req, res, next) => {
  if (!req.body.payload) {
    res.status(400).send('Missing payload');
    return;
  }

  const data = Buffer.from(req.body.payload);
  try {
    const messageId = await topic.publishMessage({data});
    res.status(200).send(`Message ${messageId} sent.`);
  } catch (error) {
    next(error);
  }
});

PHP

示例应用使用您在 app.yaml 文件中设置的值来配置环境变量。推送请求处理程序使用这些值来确认请求来自 Pub/Sub 且来源可信:

runtime: php
env: flex

示例应用维护一个全局列表以存储此实例接收的消息:

$messages = [];

pull 方法从您创建的主题中检索消息,并将消息添加到消息列表中:

// get PULL pubsub messages
$pubsub = new PubSubClient([
    'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionName);
$pullMessages = [];
foreach ($subscription->pull(['returnImmediately' => true]) as $pullMessage) {
    $pullMessages[] = $pullMessage;
    $messages[] = $pullMessage->data();
}
// acknowledge PULL messages
if ($pullMessages) {
    $subscription->acknowledgeBatch($pullMessages);
}

publish 方法向主题发布新消息:

if ($message = (string) $request->getBody()) {
    // Publish the pubsub message to the topic
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    return $response->withStatus(204);
}

Python

示例应用使用您在 app.yaml 文件中设置的值来配置环境变量。推送请求处理程序使用这些值来确认请求来自 Pub/Sub 且来源可信:

app.config['PUBSUB_VERIFICATION_TOKEN'] = \
    os.environ['PUBSUB_VERIFICATION_TOKEN']
app.config['PUBSUB_TOPIC'] = os.environ['PUBSUB_TOPIC']

示例应用维护一个全局列表以存储此实例接收的消息:

MESSAGES = []

pubsub_push() 方法会接收推送的消息并将其添加到 MESSAGES 全局列表中:

@app.route("/pubsub/push", methods=["POST"])
def pubsub_push():
    if request.args.get("token", "") != current_app.config["PUBSUB_VERIFICATION_TOKEN"]:
        return "Invalid request", 400

    envelope = json.loads(request.data.decode("utf-8"))
    payload = base64.b64decode(envelope["message"]["data"])

    MESSAGES.append(payload)

    # Returning any 2xx status indicates successful receipt of the message.
    return "OK", 200

index() 方法与 App Engine Web 应用交互,以发布新消息并显示收到的消息:

@app.route("/", methods=["GET", "POST"])
def index():
    if request.method == "GET":
        return render_template("index.html", messages=MESSAGES)

    data = request.form.get("payload", "Example payload").encode("utf-8")

    # publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(
        current_app.config["PROJECT"], current_app.config["PUBSUB_TOPIC"]
    )

    publisher.publish(topic_path, data=data)

    return "OK", 200

Ruby

示例应用使用您在 app.yaml 文件中设置的值来配置环境变量。推送请求处理程序使用这些值来确认请求来自 Pub/Sub 且来源可信:

topic = pubsub.topic ENV["PUBSUB_TOPIC"]
PUBSUB_VERIFICATION_TOKEN = ENV["PUBSUB_VERIFICATION_TOKEN"]

示例应用维护一个全局列表以存储此实例接收的消息:

# List of all messages received by this instance
messages = []

此方法接收推送的消息并将其添加到 messages 全局列表中:

post "/pubsub/push" do
  halt 400 if params[:token] != PUBSUB_VERIFICATION_TOKEN

  message = JSON.parse request.body.read
  payload = Base64.decode64 message["message"]["data"]

  messages.push payload
end

此方法与 App Engine 网络应用交互,以发布新消息并显示收到的消息:

get "/" do
  @claims = claims
  @messages = messages

  slim :index
end

post "/publish" do
  topic.publish params[:payload]

  redirect "/", 303
end

.NET

[HttpGet]
[HttpPost]
public async Task<IActionResult> IndexAsync(MessageForm messageForm)
{
    var model = new MessageList();
    if (!_options.HasGoodProjectId())
    {
        model.MissingProjectId = true;
        return View(model);
    }
    if (!string.IsNullOrEmpty(messageForm.Message))
    {
        // Publish the message.
        var pubsubMessage = new PubsubMessage()
        {
            Data = ByteString.CopyFromUtf8(messageForm.Message)
        };
        pubsubMessage.Attributes["token"] = _options.VerificationToken;
        await _publisher.PublishAsync(pubsubMessage);
        model.PublishedMessage = messageForm.Message;
    }
    // Render the current list of messages.
    model.Messages = s_receivedMessages.ToArray();
    model.AuthMessages = s_authenticatedMessages.ToArray();
    return View(model);
}

在本地运行示例

在本地运行时,您可以借助 Google Cloud CLI 提供身份验证以使用 Google Cloud API。假设您按照前提条件中的说明设置了环境,那么您已经运行了 gcloud init 命令,该命令可提供此身份验证。

Go

在启动应用之前设置环境变量:

export GOOGLE_CLOUD_PROJECT=[your-project-id]
export PUBSUB_VERIFICATION_TOKEN=[your-token]
export PUBSUB_TOPIC=[your-topic]
go run pubsub.go

Java

mvn clean package

在启动应用之前设置环境变量:

export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
mvn jetty:run

Node.js

在启动应用之前设置环境变量:

export GOOGLE_CLOUD_PROJECT=[your-project-id]
export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
npm install
npm start

PHP

使用 Composer 安装依赖项:

composer install

然后在启动应用之前设置环境变量:

export GOOGLE_CLOUD_PROJECT=[your-project-id]
export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
php -S localhost:8080

Python

安装依赖项(最好在虚拟环境中)。

Mac OS/Linux

  1. 创建一个独立的 Python 环境
    python3 -m venv env
    source env/bin/activate
  2. 如果您当前不在包含示例代码的目录中,则导航到包含 hello_world 示例代码的目录。然后安装依赖项:
    cd YOUR_SAMPLE_CODE_DIR
    pip install -r requirements.txt

Windows

使用 Powershell 运行 Python 包。

  1. 找到已安装的 PowerShell
  2. 右键点击 Powershell 的快捷方式,并以管理员身份启动。
  3. 创建一个独立的 Python 环境
    python -m venv env
    .\env\Scripts\activate
  4. 导航到项目目录并安装依赖项。如果您当前不在包含示例代码的目录中,则导航到包含 hello_world 示例代码的目录。然后安装依赖项:
    cd YOUR_SAMPLE_CODE_DIR
    pip install -r requirements.txt

然后在启动应用之前设置环境变量:

export GOOGLE_CLOUD_PROJECT=[your-project-id]
export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
python main.py

Ruby

安装依赖项:

bundle install

然后在启动应用之前设置环境变量:

export GOOGLE_CLOUD_PROJECT=[your-project-id]
export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
bundle exec ruby app.rb -p 8080

.NET

从应用的根目录运行以下命令:

    dotnet restore
    dotnet run

在网络浏览器中,输入 http://localhost:5000/。如要退出 Web 服务器,请在终端窗口中按 Ctrl+C。

模拟推送通知

应用可以在本地发送消息,但无法在本地接收推送消息。但是,您可以向本地推送通知端点发送 HTTP 请求来模拟推送消息。该示例包含文件 sample_message.json

Go

您可以使用 curlhttpie 客户端发送 HTTP POST 请求:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/push-handlers/receive_messages?token=[your-token]"

http POST ":8080/push-handlers/receive_messages?token=[your-token]" < sample_message.json

响应:

HTTP/1.1 200 OK
Date: Tue, 13 Nov 2018 16:04:18 GMT
Content-Length: 0

请求完成后,您可以刷新 localhost:8080 并在收到的消息列表中查看该消息。

Java

您可以使用 curlhttpie 客户端发送 HTTP POST 请求:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/push-handlers/receive_messages?token=[your-token]"

http POST ":8080/push-handlers/receive_messages?token=[your-token]" < sample_message.json

响应:

HTTP/1.1 200 OK
Date: Wed, 26 Apr 2017 00:03:28 GMT
Content-Length: 0
Server: Jetty(9.3.8.v20160314)

请求完成后,您可以刷新 localhost:8080 并在收到的消息列表中查看该消息。

Node.js

您可以使用 curlhttpie 客户端发送 HTTP POST 请求:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/push-handlers/receive_messages?token=[your-token]"

http POST ":8080/push-handlers/receive_messages?token=[your-token]" < sample_message.json

响应:

HTTP/1.1 200 OK
Connection: keep-alive
Date: Mon, 31 Aug 2015 22:19:50 GMT
Transfer-Encoding: chunked
X-Powered-By: Express

请求完成后,您可以刷新 localhost:8080 并在收到的消息列表中查看该消息。

PHP

您可以使用 curlhttpie 客户端发送 HTTP POST 请求:

curl -i --data @sample_message.json "localhost:4567/push-handlers/receive_messages?token=[your-token]"

http POST ":4567/push-handlers/receive_messages?token=[your-token]" < sample_message.json

请求完成后,您可以刷新 localhost:8080 并在收到的消息列表中查看该消息。

Python

您可以使用 curlhttpie 客户端发送 HTTP POST 请求:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]"

http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json

响应:

HTTP/1.0 200 OK
Content-Length: 2
Content-Type: text/html; charset=utf-8
Date: Mon, 10 Aug 2015 17:52:03 GMT
Server: Werkzeug/0.10.4 Python/2.7.10

OK

请求完成后,您可以刷新 localhost:8080 并在收到的消息列表中查看该消息。

Ruby

您可以使用 curlhttpie 客户端发送 HTTP POST 请求:

curl -i --data @sample_message.json "localhost:4567/push-handlers/receive_messages?token=[your-token]"

http POST ":4567/push-handlers/receive_messages?token=[your-token]" < sample_message.json

响应:

HTTP/1.1 200 OK
Content-Type: text/html;charset=utf-8
Content-Length: 13
X-Xss-Protection: 1; mode=block
X-Content-Type-Options: nosniff
X-Frame-Options: SAMEORIGIN
Server: WEBrick/1.3.1 (Ruby/2.3.0/2015-12-25)
Date: Wed, 20 Apr 2016 20:56:23 GMT
Connection: Keep-Alive

Hello, World!

请求完成后,您可以刷新 localhost:8080 并在收到的消息列表中查看该消息。

.NET

如需发送 HTTP POST 请求:

Get-Content -Raw .\sample_message.json | Invoke-WebRequest -Uri
http://localhost:5000/Push?token=your-secret-token -Method POST -ContentType
'text/json' -OutFile out.txt

请求完成后,您可以刷新 localhost:5000 并在收到的消息列表中查看该消息。

在 App Engine 上运行

如需使用 gcloud 命令行工具将演示版应用部署到 App Engine,请执行以下操作:

Go

app.yaml 文件所在的目录运行以下命令:

gcloud app deploy

Java

如需使用 Maven 部署应用,请运行以下命令:

mvn package appengine:deploy -Dapp.deploy.projectId=PROJECT_ID

PROJECT_ID 替换为您的 Google Cloud 项目的 ID。 如果您的 pom.xml 文件已经指定了您的项目 ID,则您无需在运行的命令中添加 -Dapp.deploy.projectId 属性。

Node.js

app.yaml 文件所在的目录运行以下命令:

gcloud app deploy

PHP

app.yaml 文件所在的目录运行以下命令:

gcloud app deploy

Python

app.yaml 文件所在的目录运行以下命令:

gcloud app deploy

Ruby

app.yaml 文件所在的目录运行以下命令:

gcloud app deploy app.yaml

.NET

app.yaml 文件所在的目录运行以下命令:

gcloud app deploy

您现在可以通过 https://PROJECT_ID.REGION_ID.r.appspot.com 访问该应用。您可以使用表单提交消息,但无法保证您的哪个应用实例会收到通知。您可以发送多条消息并刷新页面,以查看收到的消息。