Pub/Sub を Apache Kafka に接続する

このドキュメントでは、Pub/Sub グループ Kafka コネクタを使用して Apache Kafka と Pub/Sub Lite を統合する方法について説明します。

Pub/Sub Kafka コネクタの概要

Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Pub/Sub Lite は、メッセージを非同期で送受信するためのマネージド サービスです。Kafka と同様に、Pub/Sub Lite を使用してクラウド アーキテクチャ内のコンポーネント間で通信できます。

Pub/Sub グループの Kafka コネクタを使用すると、これら 2 つのシステムを統合できます。以下のコネクタがコネクタ JAR にパッケージ化されています。

  • シンクコネクタは、1 つ以上の Kafka トピックからレコードを読み取り、Pub/Sub Life に公開します。
  • ソースコネクタは、Pub/Sub Lite トピックからメッセージを読み取り、Kafka に公開します。

Pub/Sub グループの Kafka コネクタを使用する可能性のあるいくつかのシナリオを次に示します。

  • Kafka ベースのアーキテクチャを Google Cloud に移行しようとしている。
  • Google Cloud の外部に Kafka にイベントを保存するフロントエンド システムがあるが、Kafka イベントを受信する必要がある一部のバックエンド サービスを実行するために Google Cloud も使用している。
  • オンプレミスの Kafka ソリューションからログを収集して、Google Cloud に送信してデータ分析を行う。
  • Google Cloud を使用するフロントエンド システムがあるが、Kafka を使用してオンプレミスにもデータを保存している。

このコネクタには、Kafka と他のシステム間でデータをストリーミングするためのフレームワークである Kafka コネクト が必要です。このコネクタを使用するには、Kafka クラスタとともに Kafka Connect を実行する必要があります。

このドキュメントは、Kafka と Pub/Sub Lite の両方に慣れていることを前提としています。Pub/Sub Lite の使用を開始するには、Google Cloud コンソールを使用して Pub/Sub Lite でメッセージを公開および受信するをご覧ください。

Pub/Sub グループの Kafka コネクタを使ってみる

このセクションでは、次のタスクについて説明します。

  1. Pub/Sub グループの Kafka コネクタを構成する
  2. Kafka から Pub/Sub Lite にイベントを送信する。
  3. Pub/Sub Lite から Kafka にメッセージを送信する。

前提条件

Kafka をインストールする

Apache Kafka クイックスタートに沿って、ローカルマシンに単一ノードの Kafka をインストールします。クイックスタートで、次の手順を行います。

  1. 最新の Kafka リリースをダウンロードして展開します。
  2. Kafka 環境を起動します。
  3. Kafka トピックを作成します。

認証

Pub/Sub メッセージを送受信するために、Pub/Sub グループの Kafka コネクタが Pub/Sub による認証を行う必要があります。認証を設定するには、次の手順を実行します。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  13. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.

コネクタ JAR をダウンロードする

ローカルマシンにコネクタの JAR ファイルをダウンロードします。詳細については、GitHub ReadMe のコネクタの取得をご覧ください。

コネクタ構成ファイルをコピーする

  1. コネクタの GitHub リポジトリのクローンを作成するか、ダウンロードします。

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config ディレクトリの内容を Kafka インストールの config サブディレクトリにコピーします。

    cp config/* [path to Kafka installation]/config/
    

これらのファイルには、コネクタの構成設定が含まれています。

Kafka Connect 構成を更新する

  1. ダウンロードした Kafka コネクト バイナリが含まれているディレクトリに移動します。
  2. Kafka コネクト バイナリ ディレクトリにある config/connect-standalone.properties という名前のファイルをテキスト エディタで開きます。
  3. plugin.path property がコメントアウトされている場合は、コメント化解除します。
  4. plugin.path property を更新して、コネクタ JAR へのパスを追加します。

    例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename プロパティをローカル ファイル名に設定します。スタンドアロン モードでは、Kafka はこのファイルを使用してオフセット データを保存します。

    例:

    offset.storage.file.filename=/tmp/connect.offsets
    

Kafka から Pub/Sub Lite にイベントを送信する

このセクションでは、シンクコネクタを起動し、Kafka にイベントをパブリッシュして、Pub/Sub Lite から転送されたメッセージを読み取る方法について説明します。

  1. Google Cloud CLI を使用して Pub/Sub Lite 予約を作成します。

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4

    以下を置き換えます。

  2. Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub Lite トピックを作成します。

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC

    以下を置き換えます。

    • LITE_TOPIC: Kafka からメッセージを受信する Pub/Sub Lite トピックの名前。
    • LOCATION: トピックのロケーション。値は予約のロケーションと一致する必要があります。
    • RESERVATION_NAME: Pub/Sub Lite 予約の名前。
    • LITE_SUBSCRIPTION: トピックの Pub/Sub Lite サブスクリプションの名前。
  3. テキスト エディタで /config/pubsub-lite-sink-connector.properties というファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC

    以下を置き換えます。

    • KAFKA_TOPICS: 読み取る Kafka トピックのカンマ区切りのリスト。
    • PROJECT_ID: Pub/Sub Lite トピックを含む Google Cloud プロジェクト。
    • LOCATION: Pub/Sub Lite トピックのロケーション。
    • LITE_TOPIC: Kafka からメッセージを受信する Pub/Sub Lite トピック。
  4. Kafka ディレクトリから次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. Apache Kafka クイックスタートの手順に沿って、Kafka トピックにいくつかのイベントを書き込みます。

  6. Lite サブスクリプションからメッセージを受信で示される方法のいずれかを使用して、Pub/Sub Lite サブスクリプションに登録します。

Pub/Sub Lite から Kafka にメッセージを転送する。

このセクションでは、ソースコネクタの起動、Pub/Sub Lite へのメッセージのパブリッシュ、Kafka から転送されたメッセージの読み取りを行う方法について説明します。

  1. Google Cloud CLI を使用して Pub/Sub Lite 予約を作成します。

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4

    以下を置き換えます。

  2. Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub Lite トピックを作成します。

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC

    以下を置き換えます。

    • LITE_TOPIC: Pub/Sub Lite トピックの名前。
    • LOCATION: トピックのロケーション。値は予約のロケーションと一致する必要があります。
    • RESERVATION_NAME: Pub/Sub Lite 予約の名前。
    • LITE_SUBSCRIPTION: トピックの Pub/Sub Lite サブスクリプションの名前。
  3. /config/pubsub-lite-source-connector.properties という名前のファイルをテキスト エディタで開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION

    以下を置き換えます。

    • KAFKA_TOPIC: Pub/Sub メッセージを受信する Kafka トピック。
    • PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
    • LOCATION: Pub/Sub Lite トピックのロケーション。
    • LITE_SUBSCRIPTION: Pub/Sub Lite トピック。
  4. Kafka ディレクトリから次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. Lite トピックにメッセージを公開で示される方法のいずれかを使用して、Pub/Sub Lite トピックにメッセージを公開します。

  6. Kafka からのメッセージを読み取ります。Apache Kafka クイックスタートの手順に沿って、Kafka トピックからメッセージを読み取ります。

メッセージ コンバージョン

Kafka レコードにはキーと値が含まれています。これらは可変長のバイト配列です。必要に応じて、Key-Value ペアであるKafka レコードのヘッダーを含めることもできます。Pub/Sub Lite メッセージには次のフィールドがあります。

  • key: メッセージ キー(bytes
  • data: メッセージ データ (bytes)
  • attributes: 0 個以上の属性。各属性は (key,values[]) マップです。1 つの属性に複数の値を設定できます。
  • event_time: 任意のユーザー指定のイベント タイムスタンプ。

Kafka コネクトはコンバータを使用して、Kafka との間でキーと値をシリアル化します。 シリアル化を制御するには、コネクタ構成ファイルで次のプロパティを設定します。

  • key.converter: レコードキーをシリアル化するために使用されるコンバータ。
  • value.converter: レコード値をシリアル化するために使用されるコンバータ。

Kafka から Pub/Sub Lite への移行

シンクコネクタは、次のように Kafka レコードを Pub/Sub Lite メッセージに変換します。

Kafka レコード(SinkRecord Pub/Sub Lite メッセージ
キー key
data
ヘッダー attributes
タイムスタンプ eventTime
タイムスタンプ型 attributes["x-goog-pubsublite-source-kafka-event-time-type"]
トピック attributes["x-goog-pubsublite-source-kafka-topic"]
パーティション attributes["x-goog-pubsublite-source-kafka-offset"]
オフセット attributes["x-goog-pubsublite-source-kafka-partition"]

キー、値、ヘッダーは次のようにエンコードされます。

  • null スキーマは文字列スキーマとして扱われます。
  • バイト ペイロードは変換されずに直接書き込まれます。
  • 文字列、整数、浮動小数点のペイロードは、UTF-8 バイトのシーケンスにエンコードされます。
  • 他のすべてのペイロードは、プロトコル バッファ Value 型にエンコードされ、バイト文字列に変換されます。
    • ネストされた文字列フィールドは、protobuf Value にエンコードされます。
    • ネストされたバイト フィールドは、base64 でエンコードされたバイトを保持する protobuf Value にエンコードされます。
    • ネストされた数値フィールドは、protobuf Value に double としてエンコードされます。
    • 配列キー、マップキー、または構造体キーを持つマップはサポートされていません。

Pub/Sub Lite から Kafka への変換

ソース コネクタは、Pub/Sub Lite メッセージを次のように Kafka レコードに変換します。

Pub/Sub Lite メッセージ Kafka レコード(SourceRecord
key キー
data
attributes ヘッダー
event_time タイムスタンプ event_time が存在しない場合は、公開時刻が使用されます。

構成オプション

Kafka Connect API が提供する構成に加えて、コネクタは次の Pub/Sub Lite 構成をサポートしています。

シンク コネクタの構成オプション

シンク コネクタは、次の構成オプションをサポートしています。

設定 データ型 説明
connector.class String 必須。コネクタの Java クラス。Pub/Sub Lite シンクコネクタの場合、値は com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector にする必要があります。
gcp.credentials.file.path String 省略可。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。
gcp.credentials.json String 省略可。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。
pubsublite.location String 必須。Pub/Sub Lite トピックのロケーション。
pubsublite.project String 必須。Pub/Sub Lite トピックを含む Google Cloud。
pubsublite.topic String 必須。Kafka レコードをパブリッシュする Pub/Sub Lite トピック。
topics String 必須。読み取る Kafka トピックのカンマ区切りのリスト。

ソース コネクタの構成オプション

ソース コネクタは、次の構成オプションをサポートしています。

設定 データ型 説明
connector.class String 必須。コネクタの Java クラス。Pub/Sub Lite ソースコネクタの場合、値は com.google.pubsublite.kafka.source.PubSubLiteSourceConnector にする必要があります。
gcp.credentials.file.path String 省略可。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。
gcp.credentials.json String 省略可。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。
kafka.topic String 必須。Pub/Sub Lite からメッセージを受信する Kafka トピック。
pubsublite.location String 必須。Pub/Sub Lite トピックのロケーション。
pubsublite.partition_flow_control.bytes Long

Pub/Sub Lite パーティションごとの未処理のバイトの最大数。

デフォルト: 20,000,000

pubsublite.partition_flow_control.messages Long

Pub/Sub Lite パーティションあたりの未処理のメッセージの最大数。

デフォルト: Long.MAX_VALUE

pubsublite.project String 必須。Pub/Sub Lite トピックを含む Google Cloud プロジェクト。
pubsublite.subscription String 必須。メッセージを pull する Pub/Sub Lite サブスクリプションの名前。

次のステップ