Dataflow を使用して Pub/Sub からメッセージをストリーミングする

Dataflow は、ストリーミング(リアルタイム)モードのデータとバッチモードのデータを同等の信頼性と表現力で変換、活用するフルマネージド サービスです。Apache Beam SDK を使用して、簡素化されたパイプライン開発環境を提供します。Apache Beam SDK は、ウィンドウ処理とセッション分析のプリミティブが豊富に用意されているだけでなく、ソースとシンクのコネクタからなるエコシステムも提供しています。このクイックスタートでは、Dataflow を使用して次の操作を行う方法を説明します。

  • Pub/Sub トピックにパブリッシュされたメッセージを読む
  • タイムスタンプごとにメッセージをウィンドウ処理(グループ化)する
  • Cloud Storage にメッセージを書き込む

このクイックスタートでは、Java と Python で Dataflow を使用する方法について説明します。SQL もサポートされます。このクイックスタートは、ご利用を開始していただくにあたり一時的な認証情報を提供する Google Cloud Skills Boost チュートリアルとしても掲載しています。

カスタムデータ処理を行う予定がない場合は、UI ベースの Dataflow テンプレートを使用することもできます。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login

Pub/Sub プロジェクトを設定する

  1. バケット、プロジェクト、およびリージョンの変数を作成します。 Cloud Storage のバケット名は、グローバルに一意である必要がある。 このクイックスタートでコマンドを実行する場所に近接した Dataflow リージョンを選択します。 REGION 変数の値は有効なリージョン名にする必要があります。リージョンとロケーションの詳細については、Dataflow のロケーションをご覧ください。

    BUCKET_NAME=BUCKET_NAME
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=TOPIC_ID
    REGION=DATAFLOW_REGION
    SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
  2. このプロジェクトが所有する Cloud Storage バケットを作成します。

    gcloud storage buckets create gs://$BUCKET_NAME
  3. このプロジェクトで Pub/Sub トピックを作成します。

    gcloud pubsub topics create $TOPIC_ID
  4. このプロジェクトで Cloud Scheduler ジョブを作成します。このジョブは、1 分間隔で Cloud Pub/Sub トピックにメッセージをパブリッシュします。

    App Engine アプリがプロジェクトに存在しない場合は、この手順で作成されます。

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    ジョブを開始します。

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. 次のコマンドを使用して、クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Pub/Sub から Cloud Storage へのメッセージのストリーミング

コードサンプル

このサンプルコードでは、Dataflow を使用して次のことを行います。

  • Pub/Sub メッセージを読み取ります。
  • パブリッシュ タイムスタンプにより、固定サイズの間隔でメッセージをウィンドウ処理(グループ化)します。
  • 各ウィンドウのメッセージを Cloud Storage のファイルに書き込みます。

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
    DoFn,
    GroupByKey,
    io,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows


class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )


class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )


class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode())


def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

パイプラインの開始

パイプラインを開始するには、次のコマンドを実行します。

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \
    --serviceAccount=$SERVICE_ACCOUNT"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \
  --service_account_email=$SERVICE_ACCOUNT

上記のコマンドがローカルで実行され、クラウドで実行される Dataflow ジョブを起動します。コマンドが JOB_MESSAGE_DETAILED: Workers have started successfully を返したら、Ctrl+C を使用してローカル プログラムを終了します。

ジョブとパイプラインの進行状況の確認

ジョブの進行状況は Dataflow コンソールで確認できます。

Dataflow コンソールに移動

ジョブの進行状況の確認

[ジョブの詳細] ビューを開いて、次の情報を確認します。

  • ジョブの構成
  • ジョブのログ
  • ステージ指標

ジョブの進行状況の確認

Cloud Storage に出力ファイルが表示されるまで数分間かかる場合があります。

ジョブの進行状況の確認

または、以下のコマンドラインを使用して、書き出されたファイルを確認します。

gcloud storage ls gs://${BUCKET_NAME}/samples/

出力は次のようになります。

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、Google Cloud プロジェクトとそのリソースをまとめて削除してください。

  1. Cloud Scheduler ジョブを削除します。

    gcloud scheduler jobs delete publisher-job --location=$REGION
  2. Dataflow コンソールで、ジョブを停止します。パイプラインをドレインせずにキャンセルします。

  3. トピックを削除します。

    gcloud pubsub topics delete $TOPIC_ID
  4. パイプラインによって作成されたファイルを削除します。

    gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error
    gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
  5. Cloud Storage バケットを削除します。

    gcloud storage rm gs://${BUCKET_NAME} --recursive

  6. サービス アカウントを削除します。
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

次のステップ