透過 OpenTelemetry 追蹤功能,您可以找出並追蹤各種 Pub/Sub 用戶端程式庫作業的延遲情形,例如批次處理、租用權管理和流程控管。收集這項資訊有助於您偵錯用戶端程式庫問題。
OpenTelemetry 追蹤記錄的部分潛在用途包括:
- 您的服務的發布延遲時間比平常長。
- 您收到大量重送的訊息。
- 變更訂閱者用戶端的回呼函式,會導致處理時間比平常長。
事前準備
設定 OpenTelemetry 前,請先完成下列工作:
- 使用其中一個用戶端程式庫設定 Pub/Sub。
- 安裝 OpenTelemetry SDK,並設定追蹤匯出工具和追蹤器供應工具。
- 啟用 Cloud Trace API。
- 瞭解如何讀取 Cloud Observability 追蹤記錄。
必要的角色
為確保服務帳戶具備匯出追蹤記錄至 Cloud Trace 的必要權限,請要求管理員為服務帳戶授予專案中的下列 IAM 角色:
-
全部:
Cloud Trace 代理程式 (
roles/cloudtrace.agent
)
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
這些預先定義的角色包含匯出追蹤記錄至 Cloud Trace 所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:
所需權限
如要將追蹤記錄匯出至 Cloud Trace,您必須具備下列權限:
-
全部:
cloudtrace.traces.patch
您的管理員也可能會透過自訂角色或其他預先定義的角色,將這些權限授予服務帳戶。
OpenTelemetry 追蹤工作流程
如要設定 OpenTelemetry 追蹤,您可以使用 Pub/Sub 用戶端程式庫和 OpenTelemetry SDK。使用 SDK 時,您必須先設定追蹤匯出工具和追蹤器提供者,才能連線至 Pub/Sub 程式庫。在某些程式庫中,設定追蹤記錄提供者是可選的。
追蹤記錄匯出工具。OpenTelemetry SDK 會使用追蹤記錄匯出工具,判斷要將追蹤記錄傳送至何處。
追蹤程式供應商:Pub/Sub 用戶端程式庫會使用追蹤記錄提供者建立追蹤記錄。
下列步驟概述如何設定追蹤功能:
- 例項化 Cloud Trace OpenTelemetry 匯出工具。
- 如有需要,請使用 OpenTelemetery SDK 例項化並註冊 Tracer 供應工具。
- 使用啟用 OpenTelemetry 追蹤選項設定用戶端。
- 使用 Pub/Sub 用戶端程式庫發布訊息。
追蹤功能的運作方式
每發布一則訊息,用戶端程式庫就會建立新的追蹤記錄。這項追蹤記錄代表訊息的整個生命週期,從發布訊息到訊息收到確認訊息的時間。追蹤記錄會封裝作業的時間長度、父項和子項的跨度,以及連結的跨度等資訊。
追蹤記錄由根區間和相應的子區間組成。這些跨度代表用戶端程式庫在處理訊息時執行的工作。每個訊息追蹤記錄都包含下列項目:
- 發布:流程控制、排序鍵排程、批次處理和發布 RPC 的長度。
- 訂閱項目:並行控制、排序鍵排程和租用權管理。
為了從發布端傳播至訂閱端的資訊,用戶端程式庫會在發布端注入追蹤專屬屬性。只有在啟用追蹤功能並在前面加上 googclient_
前置字元時,系統才會啟用內容傳播機制。
使用追蹤功能發布訊息
下列程式碼範例說明如何使用 Pub/Sub 用戶端程式庫和 OpenTelemetry SDK 啟用追蹤功能。在這個範例中,追蹤結果會匯出至 Cloud Trace。
注意事項
在例項化追蹤程式供應器時,您會使用 OpenTelemetry SDK 設定取樣比率。這個比率會決定 SDK 應取樣的追蹤記錄數量。降低取樣率有助於降低帳單費用,並防止服務超出 Cloud Trace 跨度配額。
Go
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"go.opentelemetry.io/otel"
"google.golang.org/api/option"
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)
// publishOpenTelemetryTracing publishes a single message with OpenTelemetry tracing
// enabled, exporting to Cloud Trace.
func publishOpenTelemetryTracing(w io.Writer, projectID, topicID string, sampling float64) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
exporter, err := texporter.New(texporter.WithProjectID(projectID),
// Disable spans created by the exporter.
texporter.WithTraceClientOptions(
[]option.ClientOption{option.WithTelemetryDisabled()},
),
)
if err != nil {
return fmt.Errorf("error instantiating exporter: %w", err)
}
resources := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("publisher"),
)
// Instantiate a tracer provider with the following settings
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resources),
sdktrace.WithSampler(
sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampling)),
),
)
defer tp.ForceFlush(ctx) // flushes any pending spans
otel.SetTracerProvider(tp)
// Create a new client with tracing enabled.
client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
EnableOpenTelemetryTracing: true,
})
if err != nil {
return fmt.Errorf("pubsub: NewClient: %w", err)
}
defer client.Close()
t := client.Topic(topicID)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Publishing message with tracing"),
})
if _, err := result.Get(ctx); err != nil {
return fmt.Errorf("pubsub: result.Get: %w", err)
}
fmt.Fprintln(w, "Published a traced message")
return nil
}
C++
// Create a few namespace aliases to make the code easier to read.
namespace gc = ::google::cloud;
namespace otel = gc::otel;
namespace pubsub = gc::pubsub;
// This example uses a simple wrapper to export (upload) OTel tracing data
// to Google Cloud Trace. More complex applications may use different
// authentication, or configure their own OTel exporter.
auto project = gc::Project(project_id);
auto configuration = otel::ConfigureBasicTracing(project);
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
pubsub::Topic(project_id, topic_id),
// Configure this publisher to enable OTel tracing. Some applications may
// chose to disable tracing in some publishers or to dynamically enable
// this option based on their own configuration.
gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));
// After this point, use the Cloud Pub/Sub C++ client library as usual.
// In this example, we will send a few messages and configure a callback
// action for each one.
std::vector<gc::future<void>> ids;
for (int i = 0; i < 5; i++) {
auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
.then([](gc::future<gc::StatusOr<std::string>> f) {
auto id = f.get();
if (!id) {
std::cout << "Error in publish: " << id.status() << "\n";
return;
}
std::cout << "Sent message with id: (" << *id << ")\n";
});
ids.push_back(std::move(id));
}
// Block until the messages are actually sent.
for (auto& id : ids) id.get();
Python
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Python 環境。詳情請參閱 Pub/Sub Python API 參考說明文件。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import PublisherOptions
# TODO(developer)
# topic_project_id = "your-topic-project-id"
# trace_project_id = "your-trace-project-id"
# topic_id = "your-topic-id"
# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.
sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))
# Export to Google Trace.
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(cloud_trace_exporter)
)
# Set the `enable_open_telemetry_tracing` option to True when creating
# the publisher client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
publisher = PublisherClient(
publisher_options=PublisherOptions(
enable_open_telemetry_tracing=True,
),
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(topic_project_id, topic_id)
# Publish messages.
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data)
print(future.result())
print(f"Published messages to {topic_path}.")
TypeScript
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";
// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';
// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';
// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';
// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';
import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';
// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();
// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();
// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
}),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();
// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});
async function publishMessage(topicNameOrId: string, data: string) {
// Publishes the message as a string, e.g. "Hello, world!"
// or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Cache topic objects (publishers) and reuse them.
const publisher = pubSubClient.topic(topicNameOrId);
const messageId = await publisher.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
// The rest of the sample is in service to making sure that any
// buffered Pub/Sub messages and/or OpenTelemetry spans are properly
// flushed to the server side. In normal usage, you'd only need to do
// something like this on process shutdown.
await publisher.flush();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');
// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';
// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');
const {Resource} = require('@opentelemetry/resources');
const {
SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');
// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();
// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();
// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
}),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();
// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});
async function publishMessage(topicNameOrId, data) {
// Publishes the message as a string, e.g. "Hello, world!"
// or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Cache topic objects (publishers) and reuse them.
const publisher = pubSubClient.topic(topicNameOrId);
const messageId = await publisher.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
// The rest of the sample is in service to making sure that any
// buffered Pub/Sub messages and/or OpenTelemetry spans are properly
// flushed to the server side. In normal usage, you'd only need to do
// something like this on process shutdown.
await publisher.flush();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}
Java
import com.google.api.core.ApiFuture;
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class OpenTelemetryPublisherExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
openTelemetryPublisherExample(projectId, topicId);
}
public static void openTelemetryPublisherExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Resource resource =
Resource.getDefault().toBuilder()
.put(ResourceAttributes.SERVICE_NAME, "publisher-example")
.build();
// Creates a Cloud Trace exporter.
SpanExporter traceExporter =
TraceExporter.createWithConfiguration(
TraceConfiguration.builder().setProjectId(projectId).build());
SdkTracerProvider sdkTracerProvider =
SdkTracerProvider.builder()
.setResource(resource)
.addSpanProcessor(SimpleSpanProcessor.create(traceExporter))
.setSampler(Sampler.alwaysOn())
.build();
OpenTelemetry openTelemetry =
OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Create a publisher instance with the created OpenTelemetry object and enabling tracing.
publisher =
Publisher.newBuilder(topicName)
.setOpenTelemetry(openTelemetry)
.setEnableOpenTelemetryTracing(true)
.build();
String message = "Hello World!";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published message ID: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
接收含有追蹤資訊的訊息
Go
import (
"context"
"fmt"
"io"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"google.golang.org/api/option"
)
func subscribeOpenTelemetryTracing(w io.Writer, projectID, subID string, sampleRate float64) error {
// projectID := "my-project-id"
// subID := "my-sub"
// sampleRate := "1.0"
ctx := context.Background()
exporter, err := texporter.New(texporter.WithProjectID(projectID),
// Disable spans created by the exporter.
texporter.WithTraceClientOptions(
[]option.ClientOption{option.WithTelemetryDisabled()},
),
)
if err != nil {
return fmt.Errorf("error instantiating exporter: %w", err)
}
resources := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("subscriber"),
)
// Instantiate a tracer provider with the following settings
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resources),
sdktrace.WithSampler(
sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampleRate)),
),
)
defer tp.ForceFlush(ctx) // flushes any pending spans
otel.SetTracerProvider(tp)
// Create a new client with tracing enabled.
client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
EnableOpenTelemetryTracing: true,
})
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
sub := client.Subscription(subID)
// Receive messages for 10 seconds, which simplifies testing.
// Comment this out in production, since `Receive` should
// be used as a long running operation.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var received int32
err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
atomic.AddInt32(&received, 1)
msg.Ack()
})
if err != nil {
return fmt.Errorf("sub.Receive: %w", err)
}
fmt.Fprintf(w, "Received %d messages\n", received)
return nil
}
C++
#include "google/cloud/opentelemetry/configure_basic_tracing.h"
#include "google/cloud/opentelemetry_options.h"
#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/publisher.h"
#include "google/cloud/pubsub/subscriber.h"
#include "google/cloud/pubsub/subscription.h"
#include <iostream>
int main(int argc, char* argv[]) try {
if (argc != 4) {
std::cerr << "Usage: " << argv[0]
<< " <project-id> <topic-id> <subscription-id>\n";
return 1;
}
std::string const project_id = argv[1];
std::string const topic_id = argv[2];
std::string const subscription_id = argv[3];
// Create a few namespace aliases to make the code easier to read.
namespace gc = ::google::cloud;
namespace otel = gc::otel;
namespace pubsub = gc::pubsub;
auto constexpr kWaitTimeout = std::chrono::seconds(30);
auto project = gc::Project(project_id);
auto configuration = otel::ConfigureBasicTracing(project);
// Publish a message with tracing enabled.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
pubsub::Topic(project_id, topic_id),
gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));
// Block until the message is actually sent and throw on error.
auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
.get()
.value();
std::cout << "Sent message with id: (" << id << ")\n";
// Receive a message using streaming pull with tracing enabled.
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(project_id, subscription_id),
gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));
auto session =
subscriber.Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack();
});
std::cout << "Waiting for messages on " + subscription_id + "...\n";
// Blocks until the timeout is reached.
auto result = session.wait_for(kWaitTimeout);
if (result == std::future_status::timeout) {
std::cout << "timeout reached, ending session\n";
session.cancel();
}
return 0;
} catch (google::cloud::Status const& status) {
std::cerr << "google::cloud::Status thrown: " << status << "\n";
return 1;
}
Python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import SubscriberClient
from google.cloud.pubsub_v1.types import SubscriberOptions
# TODO(developer)
# subscription_project_id = "your-subscription-project-id"
# subscription_id = "your-subscription-id"
# cloud_trace_project_id = "your-cloud-trace-project-id"
# timeout = 300.0
# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.
sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))
# Export to Google Trace
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=cloud_trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(cloud_trace_exporter)
)
# Set the `enable_open_telemetry_tracing` option to True when creating
# the subscriber client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
subscriber = SubscriberClient(
subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True)
)
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(
subscription_project_id, subscription_id
)
# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Ack message after processing it.
print(message.data)
message.ack()
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# Optimistically subscribe to messages on the subscription.
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
print("Successfully subscribed until the timeout passed.")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
TypeScript
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';
// Imports the Google Cloud client library
import {Message, PubSub} from '@google-cloud/pubsub';
// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';
// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';
// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';
import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';
// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();
// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();
// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
}),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();
// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});
async function subscriptionListen(subscriptionNameOrId: string) {
const subscriber = pubSubClient.subscription(subscriptionNameOrId);
// Message handler for subscriber
const messageHandler = async (message: Message) => {
console.log(`Message ${message.id} received.`);
message.ack();
};
// Error handler for subscriber
const errorHandler = async (error: Error) => {
console.log('Received error:', error);
};
// Listens for new messages from the topic
subscriber.on('message', messageHandler);
subscriber.on('error', errorHandler);
// Ensures that all spans got flushed by the exporter. This function
// is in service to making sure that any buffered Pub/Sub messages
// and/or OpenTelemetry spans are properly flushed to the server
// side. In normal usage, you'd only need to do something like this
// on process shutdown.
async function shutdown() {
await subscriber.close();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}
// Wait a bit for the subscription to receive messages, then shut down
// gracefully. This is for the sample only; normally you would not need
// this delay.
await new Promise<void>(r =>
setTimeout(async () => {
subscriber.removeAllListeners();
await shutdown();
r();
}, SUBSCRIBER_TIMEOUT * 1000),
);
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');
// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';
// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');
const {Resource} = require('@opentelemetry/resources');
const {
SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');
// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();
// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();
// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
}),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();
// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});
async function subscriptionListen(subscriptionNameOrId) {
const subscriber = pubSubClient.subscription(subscriptionNameOrId);
// Message handler for subscriber
const messageHandler = async message => {
console.log(`Message ${message.id} received.`);
message.ack();
};
// Error handler for subscriber
const errorHandler = async error => {
console.log('Received error:', error);
};
// Listens for new messages from the topic
subscriber.on('message', messageHandler);
subscriber.on('error', errorHandler);
// Ensures that all spans got flushed by the exporter. This function
// is in service to making sure that any buffered Pub/Sub messages
// and/or OpenTelemetry spans are properly flushed to the server
// side. In normal usage, you'd only need to do something like this
// on process shutdown.
async function shutdown() {
await subscriber.close();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}
// Wait a bit for the subscription to receive messages, then shut down
// gracefully. This is for the sample only; normally you would not need
// this delay.
await new Promise(r =>
setTimeout(async () => {
subscriber.removeAllListeners();
await shutdown();
r();
}, SUBSCRIBER_TIMEOUT * 1000),
);
}
Java
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class OpenTelemetrySubscriberExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
openTelemetrySubscriberExample(projectId, subscriptionId);
}
public static void openTelemetrySubscriberExample(String projectId, String subscriptionId) {
Resource resource =
Resource.getDefault().toBuilder()
.put(ResourceAttributes.SERVICE_NAME, "subscriber-example")
.build();
// Creates a Cloud Trace exporter.
SpanExporter traceExporter =
TraceExporter.createWithConfiguration(
TraceConfiguration.builder().setProjectId(projectId).build());
SdkTracerProvider sdkTracerProvider =
SdkTracerProvider.builder()
.setResource(resource)
.addSpanProcessor(SimpleSpanProcessor.create(traceExporter))
.setSampler(Sampler.alwaysOn())
.build();
OpenTelemetry openTelemetry =
OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
.setOpenTelemetry(openTelemetry)
.setEnableOpenTelemetryTracing(true)
.build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
分析追蹤記錄
以下各節將詳細說明如何在 Google Cloud 主控台中追蹤及分析追蹤記錄。
注意事項
- 發布一批訊息時,發布 RPC 跨度會在個別追蹤記錄中擷取。
- 發布 RPC 有許多來源範圍,因為多個建立呼叫可在批次處理時產生發布 RPC。
OpenTelemetry 中的跨度可以有零個或一個父項跨度。
代表批次作業的區間 (例如發布批次,在邏輯上應具有多個父項) 無法使用零或一個父項區間表示。
追蹤在訊息生命週期中建立的跨度
下圖顯示在單一追蹤記錄中為單一訊息建立的跨距範例。
每個區間都可以有額外屬性,提供其他資訊,例如訊息位元組大小和排序鍵資訊。
跨度屬性會傳達額外中繼資料,例如訊息的排序鍵、訊息 ID 和訊息大小。
主要發布和訂閱區間會附加區間事件,這些事件對應於網路呼叫的發出時間和完成時間。
排解常見問題
以下問題可能會導致追蹤問題:
- 用於匯出追蹤記錄的服務帳戶沒有所需的
roles/cloudtrace.agent
角色。 - Cloud Trace 中已達到擷取時距數量上限。
- 應用程式會在未呼叫適當的清除功能時終止。