Dataflow を使用して Pub/Sub Lite メッセージをストリーミングする

独自のデータ処理プログラムを作成して実行する代わりに、DataflowApache Beam の Pub/Sub Lite I/O コネクタとともに使用することもできます。Dataflow は、ストリーミング(リアルタイム)モードのデータとバッチモードのデータを同等の信頼性と明瞭度で変換、活用するフルマネージド サービスです。Dataflow は、Apache Beam SDK を使用して開発されたプログラムを確実に実行します。Apache Beam SDK は、強力なステートフル処理抽象化と、他のストリーミング システムとバッチシステムへの I/O コネクタで構成された拡張可能なセットです。

このクイックスタートでは、以下を行う Apache Beam パイプラインを作成する方法を示します。

  • Pub/Sub Lite からメッセージを読み取る
  • タイムスタンプごとにメッセージをウィンドウ処理(グループ化)する
  • Cloud Storage にメッセージを書き込む

また、次の方法についても説明します。

  • Dataflow で実行するため、パイプラインを送信する
  • パイプラインから Dataflow フレックス テンプレートを作成する

このチュートリアルでは Maven が必要ですが、サンプル プロジェクトを Maven から Gradle に変換することもできます。詳細については、省略可: Maven から Gradle に変換するをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  8. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  9. Install the Google Cloud CLI.

  10. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  11. To initialize the gcloud CLI, run the following command:

    gcloud init
  12. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  13. Make sure that billing is enabled for your Google Cloud project.

  14. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  15. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Pub/Sub Lite プロジェクトを設定する

  1. Cloud Storage バケット、プロジェクト、Dataflow リージョンの変数を作成します。Cloud Storage のバケット名は、グローバルに一意にする必要があります。Dataflow リージョンは、ジョブを実行できる有効なリージョンである必要があります。リージョンとロケーションの詳細については、Dataflow のロケーションをご覧ください。

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
  2. このプロジェクトが所有する Cloud Storage バケットを作成します。

       gcloud storage buckets create gs://$BUCKET

Pub/Sub Lite ゾーン Lite のトピックとサブスクリプションを作成する

ゾーン Lite Pub/Sub Lite トピックと Lite サブスクリプションを作成します。

Lite ロケーションの場合は、サポートされている Pub/Sub Lite のロケーションを選択します。また、リージョンのゾーンも指定する必要があります。例: us-central1-a

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Dataflow にメッセージをストリーミングする

クイックスタート サンプルコードをダウンロードする

クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

サンプルコード

このサンプルコードでは、Dataflow を使用して次のことを行います。

  • 制限なしソースとして Pub/Sub Lite サブスクリプションからメッセージを読み取る。
  • 固定時間ウィンドウデフォルト トリガーを使用して、パブリッシュ タイムスタンプに基づいてメッセージをグループ化します。
  • グループ化したメッセージを Cloud Storage のファイルに書き込む。

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Dataflow パイプラインを開始する

Dataflow でパイプラインを開始するには、次のコマンドを実行します。

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

上記のコマンドは Dataflow ジョブを起動します。コンソール出力のリンクをたどって、Dataflow モニタリング コンソールでジョブにアクセスします。

ジョブの進行状況を確認する

Dataflow コンソールでジョブの進行状況を確認します。

Dataflow コンソールに移動

[ジョブの詳細] ビューを開いて、次の情報を確認します。

  • ジョブグラフ
  • 実行の詳細
  • ジョブの指標

Lite トピックにメッセージを公開します。

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

ワーカーログにメッセージが表示されるまで、数分間かかる場合があります。

次のコマンドを使用して、Cloud Storage に書き込まれたファイルを確認します。

gcloud storage ls "gs://$BUCKET/samples/"

出力は次のようになります。

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

次のコマンドを使用して、ファイルの内容を確認します。

gcloud storage cat "gs://$BUCKET/samples/your-filename"

省略可: Dataflow テンプレートを作成する

必要に応じて、パイプラインに基づいたカスタム Dataflow フレックス テンプレートを作成できます。Dataflow テンプレートを使用すると、完全な Java 開発環境をセットアップしなくても、 Google Cloud コンソールやコマンドラインから異なる入力パラメータを使用してジョブを実行できます。

  1. パイプラインのすべての依存関係を含むファット JAR を作成します。コマンドの実行後、target/pubsublite-streaming-bundled-1.0.jar が表示されます。

    mvn clean package -DskipTests=true
  2. テンプレート ファイルとテンプレート コンテナ イメージの名前と場所を指定します。

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
  3. カスタム Flex テンプレートを作成します。この例では、ジョブの実行に必要な仕様を含む metadata.json ファイルが指定されています。

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
  4. カスタム Flex テンプレートを使用してジョブを実行します。

Console

  1. テンプレートからジョブを作成.

  2. ジョブ名を入力します。

  3. Dataflow リージョンを入力します。

  4. カスタム テンプレートを選択します。

  5. テンプレート パスを入力します。

  6. 必要なパラメータを入力します。

  7. [ジョブを実行] をクリックします。

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \ 
     --serviceAccount=$SERVICE_ACCOUNT

クリーンアップ

このページで使用したリソースに対して Google Cloud アカウントで課金されないようにするには、 Google Cloud プロジェクトとそのリソースを削除します。

  1. Dataflow コンソールで、ジョブを停止します。パイプラインは、ドレインするのではなくキャンセルします。

  2. トピックとサブスクリプションを削除します。

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
  3. パイプラインによって作成されたファイルを削除します。

    gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
    gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
  4. テンプレート イメージとテンプレート ファイルが存在する場合は、それらを削除します。

    gcloud container images delete $TEMPLATE_IMAGE
    gcloud storage rm $TEMPLATE_PATH
  5. Cloud Storage バケットを削除します。

    gcloud storage rm gs://$BUCKET --recursive

  6. サービス アカウントを削除します。
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

次のステップ