このドキュメントでは、Pub/Sub グループ Kafka コネクタを使用して Apache Kafka と Pub/Sub Lite を統合する方法について説明します。
Pub/Sub Kafka コネクタの概要
Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Pub/Sub Lite は、メッセージを非同期で送受信するためのマネージド サービスです。Kafka と同様に、Pub/Sub Lite を使用してクラウド アーキテクチャ内のコンポーネント間で通信できます。
Pub/Sub グループの Kafka コネクタを使用すると、これら 2 つのシステムを統合できます。以下のコネクタがコネクタ JAR にパッケージ化されています。
- シンクコネクタは、1 つ以上の Kafka トピックからレコードを読み取り、Pub/Sub Life に公開します。
- ソースコネクタは、Pub/Sub Lite トピックからメッセージを読み取り、Kafka に公開します。
Pub/Sub グループの Kafka コネクタを使用する可能性のあるいくつかのシナリオを次に示します。
- Kafka ベースのアーキテクチャを Google Cloud に移行しようとしている。
- Google Cloud の外部に Kafka にイベントを保存するフロントエンド システムがあるが、Kafka イベントを受信する必要がある一部のバックエンド サービスを実行するために Google Cloud も使用している。
- オンプレミスの Kafka ソリューションからログを収集して、Google Cloud に送信してデータ分析を行う。
- Google Cloud を使用するフロントエンド システムがあるが、Kafka を使用してオンプレミスにもデータを保存している。
このコネクタには、Kafka と他のシステム間でデータをストリーミングするためのフレームワークである Kafka コネクト が必要です。このコネクタを使用するには、Kafka クラスタとともに Kafka Connect を実行する必要があります。
このドキュメントは、Kafka と Pub/Sub Lite の両方に慣れていることを前提としています。Pub/Sub Lite の使用を開始するには、Google Cloud コンソールを使用して Pub/Sub Lite でメッセージを公開および受信するをご覧ください。
Pub/Sub グループの Kafka コネクタを使ってみる
このセクションでは、次のタスクについて説明します。- Pub/Sub グループの Kafka コネクタを構成する
- Kafka から Pub/Sub Lite にイベントを送信する。
- Pub/Sub Lite から Kafka にメッセージを送信する。
前提条件
Kafka をインストールする
Apache Kafka クイックスタートに沿って、ローカルマシンに単一ノードの Kafka をインストールします。クイックスタートで、次の手順を行います。
- 最新の Kafka リリースをダウンロードして展開します。
- Kafka 環境を起動します。
- Kafka トピックを作成します。
認証
Pub/Sub メッセージを送受信するために、Pub/Sub グループの Kafka コネクタが Pub/Sub による認証を行う必要があります。認証を設定するには、次の手順を実行します。
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
コネクタ JAR をダウンロードする
ローカルマシンにコネクタの JAR ファイルをダウンロードします。詳細については、GitHub ReadMe のコネクタの取得をご覧ください。
コネクタ構成ファイルをコピーする
コネクタの GitHub リポジトリのクローンを作成するか、ダウンロードします。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
config
ディレクトリの内容を Kafka インストールのconfig
サブディレクトリにコピーします。cp config/* [path to Kafka installation]/config/
これらのファイルには、コネクタの構成設定が含まれています。
Kafka Connect 構成を更新する
- ダウンロードした Kafka コネクト バイナリが含まれているディレクトリに移動します。
- Kafka コネクト バイナリ ディレクトリにある
config/connect-standalone.properties
という名前のファイルをテキスト エディタで開きます。 plugin.path property
がコメントアウトされている場合は、コメント化解除します。plugin.path property
を更新して、コネクタ JAR へのパスを追加します。例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
offset.storage.file.filename
プロパティをローカル ファイル名に設定します。スタンドアロン モードでは、Kafka はこのファイルを使用してオフセット データを保存します。例:
offset.storage.file.filename=/tmp/connect.offsets
Kafka から Pub/Sub Lite にイベントを送信する
このセクションでは、シンクコネクタを起動し、Kafka にイベントをパブリッシュして、Pub/Sub Lite から転送されたメッセージを読み取る方法について説明します。
Google Cloud CLI を使用して Pub/Sub Lite 予約を作成します。
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
以下を置き換えます。
- RESERVATION_NAME: Pub/Sub Lite 予約の名前。
- LOCATION: 予約のロケーション
Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub Lite トピックを作成します。
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
以下を置き換えます。
- LITE_TOPIC: Kafka からメッセージを受信する Pub/Sub Lite トピックの名前。
- LOCATION: トピックのロケーション。値は予約のロケーションと一致する必要があります。
- RESERVATION_NAME: Pub/Sub Lite 予約の名前。
- LITE_SUBSCRIPTION: トピックの Pub/Sub Lite サブスクリプションの名前。
テキスト エディタで
/config/pubsub-lite-sink-connector.properties
というファイルを開きます。コメントで"TODO"
とマークされている次のプロパティの値を追加します。topics=KAFKA_TOPICS pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.topic=LITE_TOPIC
以下を置き換えます。
- KAFKA_TOPICS: 読み取る Kafka トピックのカンマ区切りのリスト。
- PROJECT_ID: Pub/Sub Lite トピックを含む Google Cloud プロジェクト。
- LOCATION: Pub/Sub Lite トピックのロケーション。
- LITE_TOPIC: Kafka からメッセージを受信する Pub/Sub Lite トピック。
Kafka ディレクトリから次のコマンドを実行します。
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-sink-connector.properties
Apache Kafka クイックスタートの手順に沿って、Kafka トピックにいくつかのイベントを書き込みます。
Lite サブスクリプションからメッセージを受信で示される方法のいずれかを使用して、Pub/Sub Lite サブスクリプションに登録します。
Pub/Sub Lite から Kafka にメッセージを転送する。
このセクションでは、ソースコネクタの起動、Pub/Sub Lite へのメッセージのパブリッシュ、Kafka から転送されたメッセージの読み取りを行う方法について説明します。
Google Cloud CLI を使用して Pub/Sub Lite 予約を作成します。
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
以下を置き換えます。
- RESERVATION_NAME: Pub/Sub Lite 予約の名前。
- LOCATION: 予約のロケーション
Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub Lite トピックを作成します。
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
以下を置き換えます。
- LITE_TOPIC: Pub/Sub Lite トピックの名前。
- LOCATION: トピックのロケーション。値は予約のロケーションと一致する必要があります。
- RESERVATION_NAME: Pub/Sub Lite 予約の名前。
- LITE_SUBSCRIPTION: トピックの Pub/Sub Lite サブスクリプションの名前。
/config/pubsub-lite-source-connector.properties
という名前のファイルをテキスト エディタで開きます。コメントで"TODO"
とマークされている次のプロパティの値を追加します。topic=KAFKA_TOPIC pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.subscription=LITE_SUBSCRIPTION
以下を置き換えます。
- KAFKA_TOPIC: Pub/Sub メッセージを受信する Kafka トピック。
- PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
- LOCATION: Pub/Sub Lite トピックのロケーション。
- LITE_SUBSCRIPTION: Pub/Sub Lite トピック。
Kafka ディレクトリから次のコマンドを実行します。
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-source-connector.properties
Lite トピックにメッセージを公開で示される方法のいずれかを使用して、Pub/Sub Lite トピックにメッセージを公開します。
Kafka からのメッセージを読み取ります。Apache Kafka クイックスタートの手順に沿って、Kafka トピックからメッセージを読み取ります。
メッセージ コンバージョン
Kafka レコードにはキーと値が含まれています。これらは可変長のバイト配列です。必要に応じて、Key-Value ペアであるKafka レコードのヘッダーを含めることもできます。Pub/Sub Lite メッセージには次のフィールドがあります。
key
: メッセージ キー(bytes
)data
: メッセージ データ (bytes
)attributes
: 0 個以上の属性。各属性は(key,values[])
マップです。1 つの属性に複数の値を設定できます。event_time
: 任意のユーザー指定のイベント タイムスタンプ。
Kafka コネクトはコンバータを使用して、Kafka との間でキーと値をシリアル化します。 シリアル化を制御するには、コネクタ構成ファイルで次のプロパティを設定します。
key.converter
: レコードキーをシリアル化するために使用されるコンバータ。value.converter
: レコード値をシリアル化するために使用されるコンバータ。
Kafka から Pub/Sub Lite への移行
シンクコネクタは、次のように Kafka レコードを Pub/Sub Lite メッセージに変換します。
Kafka レコード(SinkRecord ) |
Pub/Sub Lite メッセージ |
---|---|
キー | key |
値 | data |
ヘッダー | attributes |
タイムスタンプ | eventTime |
タイムスタンプ型 | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |
トピック | attributes["x-goog-pubsublite-source-kafka-topic"] |
パーティション | attributes["x-goog-pubsublite-source-kafka-offset"] |
オフセット | attributes["x-goog-pubsublite-source-kafka-partition"] |
キー、値、ヘッダーは次のようにエンコードされます。
- null スキーマは文字列スキーマとして扱われます。
- バイト ペイロードは変換されずに直接書き込まれます。
- 文字列、整数、浮動小数点のペイロードは、UTF-8 バイトのシーケンスにエンコードされます。
- 他のすべてのペイロードは、プロトコル バッファ
Value
型にエンコードされ、バイト文字列に変換されます。- ネストされた文字列フィールドは、protobuf
Value
にエンコードされます。 - ネストされたバイト フィールドは、base64 でエンコードされたバイトを保持する protobuf
Value
にエンコードされます。 - ネストされた数値フィールドは、protobuf
Value
に double としてエンコードされます。 - 配列キー、マップキー、または構造体キーを持つマップはサポートされていません。
- ネストされた文字列フィールドは、protobuf
Pub/Sub Lite から Kafka への変換
ソース コネクタは、Pub/Sub Lite メッセージを次のように Kafka レコードに変換します。
Pub/Sub Lite メッセージ | Kafka レコード(SourceRecord ) |
---|---|
key |
キー |
data |
値 |
attributes |
ヘッダー |
event_time |
タイムスタンプ event_time が存在しない場合は、公開時刻が使用されます。 |
構成オプション
Kafka Connect API が提供する構成に加えて、コネクタは次の Pub/Sub Lite 構成をサポートしています。
シンク コネクタの構成オプション
シンク コネクタは、次の構成オプションをサポートしています。
設定 | データ型 | 説明 |
---|---|---|
connector.class |
String |
必須。コネクタの Java クラス。Pub/Sub Lite シンクコネクタの場合、値は com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector にする必要があります。 |
gcp.credentials.file.path |
String |
省略可。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。 |
gcp.credentials.json |
String |
省略可。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。 |
pubsublite.location |
String |
必須。Pub/Sub Lite トピックのロケーション。 |
pubsublite.project |
String |
必須。Pub/Sub Lite トピックを含む Google Cloud。 |
pubsublite.topic |
String |
必須。Kafka レコードをパブリッシュする Pub/Sub Lite トピック。 |
topics |
String |
必須。読み取る Kafka トピックのカンマ区切りのリスト。 |
ソース コネクタの構成オプション
ソース コネクタは、次の構成オプションをサポートしています。
設定 | データ型 | 説明 |
---|---|---|
connector.class |
String |
必須。コネクタの Java クラス。Pub/Sub Lite ソースコネクタの場合、値は com.google.pubsublite.kafka.source.PubSubLiteSourceConnector にする必要があります。 |
gcp.credentials.file.path |
String |
省略可。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。 |
gcp.credentials.json |
String |
省略可。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。 |
kafka.topic |
String |
必須。Pub/Sub Lite からメッセージを受信する Kafka トピック。 |
pubsublite.location |
String |
必須。Pub/Sub Lite トピックのロケーション。 |
pubsublite.partition_flow_control.bytes |
Long |
Pub/Sub Lite パーティションごとの未処理のバイトの最大数。 デフォルト: 20,000,000 |
pubsublite.partition_flow_control.messages |
Long |
Pub/Sub Lite パーティションあたりの未処理のメッセージの最大数。 デフォルト: |
pubsublite.project |
String |
必須。Pub/Sub Lite トピックを含む Google Cloud プロジェクト。 |
pubsublite.subscription |
String |
必須。メッセージを pull する Pub/Sub Lite サブスクリプションの名前。 |
次のステップ
- Kafka と Pub/Sub の違いについて。
- Pub/Sub グループの Kafka コネクタの詳細
- Pub/Sub グループの Kafka コネクタの GitHub リポジトリを確認する。