Pub/Sub を Apache Kafka に接続する

このドキュメントでは、Pub/Sub グループ Kafka コネクタを使用して Apache Kafka と Pub/Sub を統合する方法について説明します。

Pub/Sub Kafka コネクタの概要

Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Pub/Sub は、メッセージを非同期で送受信するためのマネージド サービスです。Kafka と同様に、Pub/Sub を使用してクラウド アーキテクチャ内のコンポーネント間で通信できます。

Pub/Sub グループの Kafka コネクタを使用すると、これら 2 つのシステムを統合できます。コネクタ JAR には、次のコネクタがパッケージ化されています。

  • シンクコネクタは、1 つ以上の Kafka トピックからレコードを読み取り、Pub/Sub に公開します。
  • ソースコネクタは、Pub/Sub トピックからメッセージを読み取り、Kafka にパブリッシュします。

Pub/Sub グループの Kafka コネクタを使用する可能性のあるいくつかのシナリオを次に示します。

  • Kafka ベースのアーキテクチャを Google Cloud に移行しようとしている。
  • Google Cloud の外部に Kafka にイベントを保存するフロントエンド システムがあるが、Kafka イベントを受信する必要がある一部のバックエンド サービスを実行するために Google Cloud も使用している。
  • オンプレミスの Kafka ソリューションからログを収集して、Google Cloud に送信してデータ分析を行う。
  • Google Cloud を使用するフロントエンド システムがあるが、Kafka を使用してオンプレミスにもデータを保存している。

このコネクタには、Kafka と他のシステム間でデータをストリーミングするためのフレームワークである Kafka コネクト が必要です。このコネクタを使用するには、Kafka クラスタとともに Kafka Connect を実行する必要があります。

このドキュメントでは、Kafka と Pub/Sub の両方に精通していることを前提としています。この文書を読む前に、Pub/Sub クイックスタートのいずれかを完了することをお勧めします。

Pub/Sub コネクタは、Google Cloud IAM と Kafka Connect ACL の統合をサポートしていません。

コネクタを使ってみる

このセクションでは、次のタスクについて説明します。

  1. Pub/Sub グループの Kafka コネクタを構成する
  2. Kafka から Pub/Sub にイベントを送信する。
  3. Pub/Sub から Kafka にメッセージを送信する。

前提条件

Kafka をインストールする

Apache Kafka クイックスタートの手順に沿って、ローカルマシンに単一ノード Kafka をインストールします。クイックスタートで、次の手順を行います。

  1. 最新の Kafka リリースをダウンロードして展開します。
  2. Kafka 環境を開始します。
  3. Kafka トピックを作成します。

認証

Pub/Sub メッセージを送受信するために、Pub/Sub グループの Kafka コネクタが Pub/Sub による認証を行う必要があります。認証を設定するには、次の手順を実行します。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

コネクタ JAR をダウンロードする

ローカルマシンにコネクタの JAR ファイルをダウンロードします。詳細については、GitHub ReadMe のコネクタの取得をご覧ください。

コネクタ構成ファイルをコピーする

  1. コネクタの GitHub リポジトリのクローンを作成するか、ダウンロードします。

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config ディレクトリの内容を Kafka インストールの config サブディレクトリにコピーします。

    cp config/* [path to Kafka installation]/config/
    

これらのファイルには、コネクタの構成設定が含まれています。

Kafka Connect 構成を更新する

  1. ダウンロードした Kafka コネクト バイナリが含まれているディレクトリに移動します。
  2. Kafka コネクト バイナリ ディレクトリにある config/connect-standalone.properties という名前のファイルをテキスト エディタで開きます。
  3. plugin.path property がコメントアウトされている場合は、コメント化解除します。
  4. plugin.path property を更新して、コネクタ JAR へのパスを追加します。

    例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename プロパティをローカル ファイル名に設定します。スタンドアロン モードでは、Kafka はこのファイルを使用してオフセット データを保存します。

    例:

    offset.storage.file.filename=/tmp/connect.offsets
    

Kafka から Pub/Sub にイベントを転送する

このセクションでは、シンクコネクタを起動し、Kafka にイベントをパブリッシュして、Pub/Sub から転送されたメッセージを読み取る方法について説明します。

  1. Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub トピックを作成します。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    以下を置き換えます。

    • PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピックの名前。
    • PUBSUB_SUBSCRIPTION: トピックの Pub/Sub サブスクリプションの名前。
  2. テキスト エディタで /config/cps-sink-connector.properties というファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC

    以下を置き換えます。

    • KAFKA_TOPICS: 読み取る Kafka トピックのカンマ区切りのリスト。
    • PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
    • PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピック。
  3. Kafka ディレクトリから次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Apache Kafka クイックスタートの手順に沿って、Kafka トピックにイベントを書き込みます。

  5. gcloud CLI を使用して Pub/Sub からイベントを読み取ります。

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

Pub/Sub から Kafka にメッセージを転送する。

このセクションでは、ソースコネクタの起動、Pub/Sub へのメッセージのパブリッシュ、Kafka から転送されたメッセージの読み取りを行う方法について説明します。

  1. gcloud CLI を使用して、サブスクリプションで Pub/Sub トピックを作成します。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    以下を置き換えます。

    • PUBSUB_TOPIC: Pub/Sub トピックの名前。
    • PUBSUB_SUBSCRIPTION: Pub/Sub サブスクリプションの名前。
  2. テキスト エディタで /config/cps-source-connector.properties という名前のファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION

    以下を置き換えます。

    • KAFKA_TOPIC: Pub/Sub メッセージを受信する Kafka トピック。
    • PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
    • PUBSUB_TOPIC: Pub/Sub トピック。
  3. Kafka ディレクトリから次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. メッセージを gcloud CLI を使用して Pub/Sub にパブリッシュします。

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
  5. Kafka からのメッセージを読み取ります。Apache Kafka クイックスタートの手順に沿って、Kafka トピックからメッセージを読み取ります。

メッセージ コンバージョン

Kafka レコードにはキーと値が含まれています。これらは可変長のバイト配列です。必要に応じて、Key-Value ペアであるKafka レコードのヘッダーを含めることもできます。Pub/Sub メッセージには、メッセージ本文と 0 個以上の Key-Value 属性の 2 つの主要な部分があります。

Kafka コネクトはコンバータを使用して、Kafka との間でキーと値をシリアル化します。 シリアル化を制御するには、コネクタ構成ファイルで次のプロパティを設定します。

  • key.converter: レコードキーをシリアル化するために使用されるコンバータ。
  • value.converter: レコード値をシリアル化するために使用されるコンバータ。

Pub/Sub メッセージの本文は ByteString オブジェクトであるため、最も効率的な変換はペイロードを直接コピーすることです。そのため、可能であれば、同じメッセージ本文のシリアル化解除と再シリアル化を防ぐため、プリミティブ データ型(整数、浮動小数点、文字列、バイトスキーマ)を生成するコンバータの使用をおすすめします。

Kafka から Pub/Sub への変換

シンクコネクタは、Kafka レコードを次のように Pub/Sub メッセージに変換します。

  • Kafka レコードキーは、Pub/Sub メッセージに "key" という名前の属性として保存されます。
  • デフォルトでは、コネクタは Kafka レコードのヘッダーをすべてドロップします。ただし、headers.publish 構成オプションを true に設定すると、コネクタはヘッダーを Pub/Sub 属性として書き込みます。コネクタは、Pub/Sub のメッセージ属性の制限を超えるヘッダーをスキップします。
  • 整数、浮動小数点数、文字列、バイト スキーマの場合、コネクタは Kafka レコード値のバイト数を Pub/Sub メッセージ本文に直接渡します。
  • 構造体スキーマの場合、コネクタは各フィールドを Pub/Sub メッセージの属性として書き込みます。たとえば、フィールドが { "id"=123 } の場合、生成される Pub/Sub メッセージには "id"="123" という属性が与えられます。フィールドの値は常に文字列に変換されます。マップ型と構造体型は、構造体内のフィールド型としてサポートされていません。
  • マップスキーマの場合、コネクタは各 Key-Value ペアを Pub/Sub メッセージの属性として書き込みます。たとえば、マップが {"alice"=1,"bob"=2} の場合、結果の Pub/Sub メッセージには "alice"="1""bob"="2" の 2 つの属性を持ちます。キーと値は文字列に変換されます。

構造体スキーマとマップスキーマには、次のような追加の動作があります。

  • 必要に応じて、messageBodyName 構成プロパティを設定することで、特定の構造体フィールドまたはマップキーをメッセージ本文に指定できます。フィールドやキーの値はメッセージ本文に ByteString として格納されます。messageBodyName を設定しない場合、構造体とマップのスキーマのメッセージ本文は空になります。

  • 配列値の場合、コネクタはプリミティブ配列タイプのみをサポートします。配列内の値の順序は、1 つの ByteString オブジェクトに連結されます。

Pub/Sub から Kafka への変換

ソースコネクタは、次のように Pub/Sub メッセージを Kafka レコードに変換します。

  • Kafka レコードキー: デフォルトでは、キーは null に設定されています。必要に応じて、kafka.key.attribute 構成オプションを設定して、キーとして使用する Pub/Sub メッセージ属性を指定できます。その場合、コネクタはその名前の属性を検索し、レコードキーを属性値に設定します。指定された属性が存在しない場合、レコードキーは null に設定されます。

  • Kafka レコード値. コネクタはレコード値を次のように書き込みます。

    • Pub/Sub メッセージにカスタム属性がない場合、コネクタは value.converter によって指定されたコンバータを使用して、Pub/Sub メッセージ本文を byte[] 型として Kafka レコード値に直接書き込みます。

    • Pub/Sub メッセージにカスタム属性があり、kafka.record.headersfalse の場合、コネクタはレコード値に構造体を書き込みます。この構造体は、属性ごとに 1 つのフィールドと、Pub/Sub メッセージ本文(バイトとして保存)を持つ "message" という名前のフィールドを含んでいます。

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      この場合、struct スキーマと互換性のある value.converterorg.apache.kafka.connect.json.JsonConverter など)を使用する必要があります。

    • Pub/Sub メッセージにカスタム属性があり、kafka.record.headerstrue の場合、コネクタは属性を Kafka レコード ヘッダーとして書き込みます。value.converter で指定されたコンバータを使用して、Pub/Sub メッセージ本文を byte[] タイプとして Kafka レコード値に直接書き込みます。

  • Kafka レコード ヘッダー。デフォルトでは、kafka.record.headerstrue に設定しない限りヘッダーは空です。

構成オプション

Kafka Connect API が提供する構成に加えて、Pub/Sub グループの Kafka コネクタは、Pub/Sub コネクタの構成で説明されているシンクとソースの構成をサポートしています。

サポートの利用

サポートが必要な場合は、サポート チケットを作成してください。一般的な質問やディスカッションについては、GitHub リポジトリで問題を作成してください。

次のステップ