使用 Dataflow 流式传输 Pub/Sub Lite 消息

除了编写和运行自己的数据处理程序之外,您还可以将 DataflowApache Beam 的 Pub/Sub Lite I/O 连接器搭配使用。Dataflow 是一种全代管式服务,用于以流式传输(实时)模式和批量模式对数据进行转换并丰富数据内容,同时保持同等的可靠性和表现力。它安全地执行使用 Apache Beam SDK 开发的程序,该程序可将一组强大的有状态处理抽象和 I/O 连接器连接到其他流式传输和批处理系统。

本快速入门介绍如何编写将执行以下操作的 Apache Beam 流水线:

  • 从 Pub/Sub 精简版读取消息
  • 按发布时间戳选取(或组合)消息
  • 将消息写入 Cloud Storage

此外,本文还介绍了如何执行以下操作:

  • 提交流水线以在 Dataflow 上运行
  • 通过流水线创建 Dataflow Flex 模板

本教程需要使用 Maven,但您也可以将示例项目从 Maven 转换为 Gradle。如需了解详情,请参阅可选:从 Maven 转换为 Gradle

准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  8. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  9. Install the Google Cloud CLI.

  10. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  11. To initialize the gcloud CLI, run the following command:

    gcloud init
  12. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  13. Verify that billing is enabled for your Google Cloud project.

  14. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  15. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

设置 Pub/Sub Lite 项目

  1. 为您的 Cloud Storage 存储分区、项目和 Dataflow 区域创建变量。Cloud Storage 存储桶名称必须是全局唯一的。 Dataflow 区域必须是您可以运行作业的有效区域。如需详细了解区域和位置,请参阅 Dataflow 位置

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
  2. 创建此项目所拥有的 Cloud Storage 存储分区:

       gcloud storage buckets create gs://$BUCKET

创建 Pub/Sub Lite 区域 Lite 主题和订阅

创建区域性 Lite Pub/Sub Lite 主题和 Lite 订阅。

对于 Lite 位置,请选择受支持的 Pub/Sub Lite 位置。您还必须为该区域指定可用区。例如 us-central1-a

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

将消息流式传输到 Dataflow

下载快速入门示例代码

克隆快速入门代码库并导航到示例代码目录。

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

示例代码

此示例代码使用 Dataflow 执行以下操作:

  • 从 Pub/Sub 精简版订阅读取无界限来源的消息
  • 使用固定时间窗口默认触发器,按发布时间戳对消息进行分组。
  • 将已分组的消息写入 Cloud Storage 上的文件。

Java

在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

启动 Dataflow 流水线

如需在 Dataflow 中启动流水线,请运行以下命令:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

上述命令会启动 Dataflow 作业。点击控制台输出中的链接以访问 Dataflow 监控控制台中的作业。

观察作业进度

在 Dataflow 控制台中查看作业的进度。

转到 Dataflow 控制台

打开作业详细信息视图以查看以下内容:

  • 作业图
  • 执行详情
  • 作业指标

将部分消息发布到精简版主题。

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

您可能需要等待几分钟才能在工作器日志中看到这些消息。

使用以下命令检查已写入 Cloud Storage 的文件。

gcloud storage ls "gs://$BUCKET/samples/"

输出应如下所示:

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

使用以下命令查看文件中的内容:

gcloud storage cat "gs://$BUCKET/samples/your-filename"

可选:创建 Dataflow 模板

您可以根据需要为流水线创建自定义 Dataflow Flex 模板。通过 Dataflow 模板,您可以使用 Google Cloud 控制台或命令行中的不同输入参数运行作业,而无需设置完整的 Java 开发环境。

  1. 创建一个包含流水线的所有依赖项的 Fat JAR。运行命令后,您应该会看到 target/pubsublite-streaming-bundled-1.0.jar

    mvn clean package -DskipTests=true
  2. 为您的模板文件和模板容器映像提供名称和位置。

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
  3. 构建自定义 Flex 模板。示例提供了必需的 metadata.json 文件,其中包含运行作业的必要规范。

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
  4. 使用自定义 Flex 模板运行作业。

控制台

  1. 基于模板创建作业

  2. 输入作业名称

  3. 输入您的 Dataflow 区域

  4. 选择您的自定义模板

  5. 输入您的模板路径

  6. 输入必需参数。

  7. 点击运行作业

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \ 
     --serviceAccount=$SERVICE_ACCOUNT

清理

为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

  1. 在 Dataflow 控制台中,停止作业。取消流水线,而不是取消流水线。

  2. 删除主题和订阅。

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
  3. 删除流水线创建的文件。

    gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
    gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
  4. 删除模板映像和模板文件(如果存在)。

    gcloud container images delete $TEMPLATE_IMAGE
    gcloud storage rm $TEMPLATE_PATH
  5. 移除 Cloud Storage 存储分区。

    gcloud storage rm gs://$BUCKET --recursive

  6. 删除服务账号:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

后续步骤